[SPARK-24691][SQL] Dispatch the type support check in FileFormat implementation
[spark.git] / sql / core / src / main / scala / org / apache / spark / sql / execution / datasources / parquet / ParquetFileFormat.scala
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *    http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package org.apache.spark.sql.execution.datasources.parquet
19
20 import java.io.IOException
21 import java.net.URI
22
23 import scala.collection.JavaConverters._
24 import scala.collection.mutable
25 import scala.collection.parallel.ForkJoinTaskSupport
26 import scala.util.{Failure, Try}
27
28 import org.apache.hadoop.conf.Configuration
29 import org.apache.hadoop.fs.{FileStatus, Path}
30 import org.apache.hadoop.mapreduce._
31 import org.apache.hadoop.mapreduce.lib.input.FileSplit
32 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
33 import org.apache.parquet.filter2.compat.FilterCompat
34 import org.apache.parquet.filter2.predicate.FilterApi
35 import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
36 import org.apache.parquet.hadoop._
37 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel
38 import org.apache.parquet.hadoop.codec.CodecConfig
39 import org.apache.parquet.hadoop.util.ContextUtil
40 import org.apache.parquet.schema.MessageType
41
42 import org.apache.spark.{SparkException, TaskContext}
43 import org.apache.spark.internal.Logging
44 import org.apache.spark.sql._
45 import org.apache.spark.sql.catalyst.InternalRow
46 import org.apache.spark.sql.catalyst.expressions._
47 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
48 import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
49 import org.apache.spark.sql.catalyst.util.DateTimeUtils
50 import org.apache.spark.sql.execution.datasources._
51 import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
52 import org.apache.spark.sql.internal.SQLConf
53 import org.apache.spark.sql.sources._
54 import org.apache.spark.sql.types._
55 import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
56
57 class ParquetFileFormat
58   extends FileFormat
59   with DataSourceRegister
60   with Logging
61   with Serializable {
62   // Hold a reference to the (serializable) singleton instance of ParquetLogRedirector. This
63   // ensures the ParquetLogRedirector class is initialized whether an instance of ParquetFileFormat
64   // is constructed or deserialized. Do not heed the Scala compiler's warning about an unused field
65   // here.
66   private val parquetLogRedirector = ParquetLogRedirector.INSTANCE
67
68   override def shortName(): String = "parquet"
69
70   override def toString: String = "Parquet"
71
72   override def hashCode(): Int = getClass.hashCode()
73
74   override def equals(other: Any): Boolean = other.isInstanceOf[ParquetFileFormat]
75
76   override def prepareWrite(
77       sparkSession: SparkSession,
78       job: Job,
79       options: Map[String, String],
80       dataSchema: StructType): OutputWriterFactory = {
81     val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
82
83     val conf = ContextUtil.getConfiguration(job)
84
85     val committerClass =
86       conf.getClass(
87         SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
88         classOf[ParquetOutputCommitter],
89         classOf[OutputCommitter])
90
91     if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
92       logInfo("Using default output committer for Parquet: " +
93         classOf[ParquetOutputCommitter].getCanonicalName)
94     } else {
95       logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName)
96     }
97
98     conf.setClass(
99       SQLConf.OUTPUT_COMMITTER_CLASS.key,
100       committerClass,
101       classOf[OutputCommitter])
102
103     // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override
104     // it in `ParquetOutputWriter` to support appending and dynamic partitioning.  The reason why
105     // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is
106     // bundled with `ParquetOutputFormat[Row]`.
107     job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
108
109     ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
110
111     // This metadata is useful for keeping UDTs like Vector/Matrix.
112     ParquetWriteSupport.setSchema(dataSchema, conf)
113
114     // Sets flags for `ParquetWriteSupport`, which converts Catalyst schema to Parquet
115     // schema and writes actual rows to Parquet files.
116     conf.set(
117       SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
118       sparkSession.sessionState.conf.writeLegacyParquetFormat.toString)
119
120     conf.set(
121       SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
122       sparkSession.sessionState.conf.parquetOutputTimestampType.toString)
123
124     // Sets compression scheme
125     conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
126
127     // SPARK-15719: Disables writing Parquet summary files by default.
128     if (conf.get(ParquetOutputFormat.JOB_SUMMARY_LEVEL) == null
129       && conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null) {
130       conf.setEnum(ParquetOutputFormat.JOB_SUMMARY_LEVEL, JobSummaryLevel.NONE)
131     }
132
133     if (ParquetOutputFormat.getJobSummaryLevel(conf) == JobSummaryLevel.NONE
134       && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) {
135       // output summary is requested, but the class is not a Parquet Committer
136       logWarning(s"Committer $committerClass is not a ParquetOutputCommitter and cannot" +
137         s" create job summaries. " +
138         s"Set Parquet option ${ParquetOutputFormat.JOB_SUMMARY_LEVEL} to NONE.")
139     }
140
141     new OutputWriterFactory {
142       // This OutputWriterFactory instance is deserialized when writing Parquet files on the
143       // executor side without constructing or deserializing ParquetFileFormat. Therefore, we hold
144       // another reference to ParquetLogRedirector.INSTANCE here to ensure the latter class is
145       // initialized.
146       private val parquetLogRedirector = ParquetLogRedirector.INSTANCE
147
148         override def newInstance(
149           path: String,
150           dataSchema: StructType,
151           context: TaskAttemptContext): OutputWriter = {
152         new ParquetOutputWriter(path, context)
153       }
154
155       override def getFileExtension(context: TaskAttemptContext): String = {
156         CodecConfig.from(context).getCodec.getExtension + ".parquet"
157       }
158     }
159   }
160
161   override def inferSchema(
162       sparkSession: SparkSession,
163       parameters: Map[String, String],
164       files: Seq[FileStatus]): Option[StructType] = {
165     val parquetOptions = new ParquetOptions(parameters, sparkSession.sessionState.conf)
166
167     // Should we merge schemas from all Parquet part-files?
168     val shouldMergeSchemas = parquetOptions.mergeSchema
169
170     val mergeRespectSummaries = sparkSession.sessionState.conf.isParquetSchemaRespectSummaries
171
172     val filesByType = splitFiles(files)
173
174     // Sees which file(s) we need to touch in order to figure out the schema.
175     //
176     // Always tries the summary files first if users don't require a merged schema.  In this case,
177     // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
178     // groups information, and could be much smaller for large Parquet files with lots of row
179     // groups.  If no summary file is available, falls back to some random part-file.
180     //
181     // NOTE: Metadata stored in the summary files are merged from all part-files.  However, for
182     // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
183     // how to merge them correctly if some key is associated with different values in different
184     // part-files.  When this happens, Parquet simply gives up generating the summary file.  This
185     // implies that if a summary file presents, then:
186     //
187     //   1. Either all part-files have exactly the same Spark SQL schema, or
188     //   2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
189     //      their schemas may differ from each other).
190     //
191     // Here we tend to be pessimistic and take the second case into account.  Basically this means
192     // we can't trust the summary files if users require a merged schema, and must touch all part-
193     // files to do the merge.
194     val filesToTouch =
195       if (shouldMergeSchemas) {
196         // Also includes summary files, 'cause there might be empty partition directories.
197
198         // If mergeRespectSummaries config is true, we assume that all part-files are the same for
199         // their schema with summary files, so we ignore them when merging schema.
200         // If the config is disabled, which is the default setting, we merge all part-files.
201         // In this mode, we only need to merge schemas contained in all those summary files.
202         // You should enable this configuration only if you are very sure that for the parquet
203         // part-files to read there are corresponding summary files containing correct schema.
204
205         // As filed in SPARK-11500, the order of files to touch is a matter, which might affect
206         // the ordering of the output columns. There are several things to mention here.
207         //
208         //  1. If mergeRespectSummaries config is false, then it merges schemas by reducing from
209         //     the first part-file so that the columns of the lexicographically first file show
210         //     first.
211         //
212         //  2. If mergeRespectSummaries config is true, then there should be, at least,
213         //     "_metadata"s for all given files, so that we can ensure the columns of
214         //     the lexicographically first file show first.
215         //
216         //  3. If shouldMergeSchemas is false, but when multiple files are given, there is
217         //     no guarantee of the output order, since there might not be a summary file for the
218         //     lexicographically first file, which ends up putting ahead the columns of
219         //     the other files. However, this should be okay since not enabling
220         //     shouldMergeSchemas means (assumes) all the files have the same schemas.
221
222         val needMerged: Seq[FileStatus] =
223           if (mergeRespectSummaries) {
224             Seq.empty
225           } else {
226             filesByType.data
227           }
228         needMerged ++ filesByType.metadata ++ filesByType.commonMetadata
229       } else {
230         // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
231         // don't have this.
232         filesByType.commonMetadata.headOption
233             // Falls back to "_metadata"
234             .orElse(filesByType.metadata.headOption)
235             // Summary file(s) not found, the Parquet file is either corrupted, or different part-
236             // files contain conflicting user defined metadata (two or more values are associated
237             // with a same key in different files).  In either case, we fall back to any of the
238             // first part-file, and just assume all schemas are consistent.
239             .orElse(filesByType.data.headOption)
240             .toSeq
241       }
242     ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)
243   }
244
245   case class FileTypes(
246       data: Seq[FileStatus],
247       metadata: Seq[FileStatus],
248       commonMetadata: Seq[FileStatus])
249
250   private def splitFiles(allFiles: Seq[FileStatus]): FileTypes = {
251     val leaves = allFiles.toArray.sortBy(_.getPath.toString)
252
253     FileTypes(
254       data = leaves.filterNot(f => isSummaryFile(f.getPath)),
255       metadata =
256         leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE),
257       commonMetadata =
258         leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE))
259   }
260
261   private def isSummaryFile(file: Path): Boolean = {
262     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
263         file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
264   }
265
266   /**
267    * Returns whether the reader will return the rows as batch or not.
268    */
269   override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
270     val conf = sparkSession.sessionState.conf
271     conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled &&
272       schema.length <= conf.wholeStageMaxNumFields &&
273       schema.forall(_.dataType.isInstanceOf[AtomicType])
274   }
275
276   override def vectorTypes(
277       requiredSchema: StructType,
278       partitionSchema: StructType,
279       sqlConf: SQLConf): Option[Seq[String]] = {
280     Option(Seq.fill(requiredSchema.fields.length + partitionSchema.fields.length)(
281       if (!sqlConf.offHeapColumnVectorEnabled) {
282         classOf[OnHeapColumnVector].getName
283       } else {
284         classOf[OffHeapColumnVector].getName
285       }
286     ))
287   }
288
289   override def isSplitable(
290       sparkSession: SparkSession,
291       options: Map[String, String],
292       path: Path): Boolean = {
293     true
294   }
295
296   override def buildReaderWithPartitionValues(
297       sparkSession: SparkSession,
298       dataSchema: StructType,
299       partitionSchema: StructType,
300       requiredSchema: StructType,
301       filters: Seq[Filter],
302       options: Map[String, String],
303       hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
304     hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
305     hadoopConf.set(
306       ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
307       requiredSchema.json)
308     hadoopConf.set(
309       ParquetWriteSupport.SPARK_ROW_SCHEMA,
310       requiredSchema.json)
311     hadoopConf.set(
312       SQLConf.SESSION_LOCAL_TIMEZONE.key,
313       sparkSession.sessionState.conf.sessionLocalTimeZone)
314
315     ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
316
317     // Sets flags for `ParquetToSparkSchemaConverter`
318     hadoopConf.setBoolean(
319       SQLConf.PARQUET_BINARY_AS_STRING.key,
320       sparkSession.sessionState.conf.isParquetBinaryAsString)
321     hadoopConf.setBoolean(
322       SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
323       sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
324
325     val broadcastedHadoopConf =
326       sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
327
328     // TODO: if you move this into the closure it reverts to the default values.
329     // If true, enable using the custom RecordReader for parquet. This only works for
330     // a subset of the types (no complex types).
331     val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
332     val sqlConf = sparkSession.sessionState.conf
333     val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
334     val enableVectorizedReader: Boolean =
335       sqlConf.parquetVectorizedReaderEnabled &&
336       resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
337     val enableRecordFilter: Boolean =
338       sparkSession.sessionState.conf.parquetRecordFilterEnabled
339     val timestampConversion: Boolean =
340       sparkSession.sessionState.conf.isParquetINT96TimestampConversion
341     val capacity = sqlConf.parquetVectorizedReaderBatchSize
342     val enableParquetFilterPushDown: Boolean =
343       sparkSession.sessionState.conf.parquetFilterPushDown
344     // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
345     val returningBatch = supportBatch(sparkSession, resultSchema)
346     val pushDownDate = sqlConf.parquetFilterPushDownDate
347     val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
348
349     (file: PartitionedFile) => {
350       assert(file.partitionValues.numFields == partitionSchema.size)
351
352       val fileSplit =
353         new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
354       val filePath = fileSplit.getPath
355
356       val split =
357         new org.apache.parquet.hadoop.ParquetInputSplit(
358           filePath,
359           fileSplit.getStart,
360           fileSplit.getStart + fileSplit.getLength,
361           fileSplit.getLength,
362           fileSplit.getLocations,
363           null)
364
365       val sharedConf = broadcastedHadoopConf.value.value
366
367       // Try to push down filters when filter push-down is enabled.
368       val pushed = if (enableParquetFilterPushDown) {
369         val parquetSchema = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS)
370           .getFileMetaData.getSchema
371         filters
372           // Collects all converted Parquet filter predicates. Notice that not all predicates can be
373           // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
374           // is used here.
375           .flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
376           .createFilter(parquetSchema, _))
377           .reduceOption(FilterApi.and)
378       } else {
379         None
380       }
381
382       // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
383       // *only* if the file was created by something other than "parquet-mr", so check the actual
384       // writer here for this file.  We have to do this per-file, as each file in the table may
385       // have different writers.
386       def isCreatedByParquetMr(): Boolean = {
387         val footer = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS)
388         footer.getFileMetaData().getCreatedBy().startsWith("parquet-mr")
389       }
390       val convertTz =
391         if (timestampConversion && !isCreatedByParquetMr()) {
392           Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
393         } else {
394           None
395         }
396
397       val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
398       val hadoopAttemptContext =
399         new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
400
401       // Try to push down filters when filter push-down is enabled.
402       // Notice: This push-down is RowGroups level, not individual records.
403       if (pushed.isDefined) {
404         ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
405       }
406       val taskContext = Option(TaskContext.get())
407       if (enableVectorizedReader) {
408         val vectorizedReader = new VectorizedParquetRecordReader(
409           convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
410         val iter = new RecordReaderIterator(vectorizedReader)
411         // SPARK-23457 Register a task completion lister before `initialization`.
412         taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
413         vectorizedReader.initialize(split, hadoopAttemptContext)
414         logDebug(s"Appending $partitionSchema ${file.partitionValues}")
415         vectorizedReader.initBatch(partitionSchema, file.partitionValues)
416         if (returningBatch) {
417           vectorizedReader.enableReturningBatches()
418         }
419
420         // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
421         iter.asInstanceOf[Iterator[InternalRow]]
422       } else {
423         logDebug(s"Falling back to parquet-mr")
424         // ParquetRecordReader returns UnsafeRow
425         val reader = if (pushed.isDefined && enableRecordFilter) {
426           val parquetFilter = FilterCompat.get(pushed.get, null)
427           new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter)
428         } else {
429           new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
430         }
431         val iter = new RecordReaderIterator(reader)
432         // SPARK-23457 Register a task completion lister before `initialization`.
433         taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
434         reader.initialize(split, hadoopAttemptContext)
435
436         val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
437         val joinedRow = new JoinedRow()
438         val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
439
440         // This is a horrible erasure hack...  if we type the iterator above, then it actually check
441         // the type in next() and we get a class cast exception.  If we make that function return
442         // Object, then we can defer the cast until later!
443         if (partitionSchema.length == 0) {
444           // There is no partition columns
445           iter.asInstanceOf[Iterator[InternalRow]]
446         } else {
447           iter.asInstanceOf[Iterator[InternalRow]]
448             .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
449         }
450       }
451     }
452   }
453
454   override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
455     case _: AtomicType => true
456
457     case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) }
458
459     case ArrayType(elementType, _) => supportDataType(elementType, isReadPath)
460
461     case MapType(keyType, valueType, _) =>
462       supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath)
463
464     case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath)
465
466     case _ => false
467   }
468 }
469
470 object ParquetFileFormat extends Logging {
471   private[parquet] def readSchema(
472       footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {
473
474     val converter = new ParquetToSparkSchemaConverter(
475       sparkSession.sessionState.conf.isParquetBinaryAsString,
476       sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
477
478     val seen = mutable.HashSet[String]()
479     val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
480       val metadata = footer.getParquetMetadata.getFileMetaData
481       val serializedSchema = metadata
482         .getKeyValueMetaData
483         .asScala.toMap
484         .get(ParquetReadSupport.SPARK_METADATA_KEY)
485       if (serializedSchema.isEmpty) {
486         // Falls back to Parquet schema if no Spark SQL schema found.
487         Some(converter.convert(metadata.getSchema))
488       } else if (!seen.contains(serializedSchema.get)) {
489         seen += serializedSchema.get
490
491         // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
492         // whatever is available.
493         Some(Try(DataType.fromJson(serializedSchema.get))
494           .recover { case _: Throwable =>
495             logInfo(
496               "Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
497                 "falling back to the deprecated DataType.fromCaseClassString parser.")
498             LegacyTypeStringParser.parse(serializedSchema.get)
499           }
500           .recover { case cause: Throwable =>
501             logWarning(
502               s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
503                  |\t$serializedSchema
504                """.stripMargin,
505               cause)
506           }
507           .map(_.asInstanceOf[StructType])
508           .getOrElse {
509             // Falls back to Parquet schema if Spark SQL schema can't be parsed.
510             converter.convert(metadata.getSchema)
511           })
512       } else {
513         None
514       }
515     }
516
517     finalSchemas.reduceOption { (left, right) =>
518       try left.merge(right) catch { case e: Throwable =>
519         throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
520       }
521     }
522   }
523
524   /**
525    * Reads Parquet footers in multi-threaded manner.
526    * If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted
527    * files when reading footers.
528    */
529   private[parquet] def readParquetFootersInParallel(
530       conf: Configuration,
531       partFiles: Seq[FileStatus],
532       ignoreCorruptFiles: Boolean): Seq[Footer] = {
533     val parFiles = partFiles.par
534     val pool = ThreadUtils.newForkJoinPool("readingParquetFooters", 8)
535     parFiles.tasksupport = new ForkJoinTaskSupport(pool)
536     try {
537       parFiles.flatMap { currentFile =>
538         try {
539           // Skips row group information since we only need the schema.
540           // ParquetFileReader.readFooter throws RuntimeException, instead of IOException,
541           // when it can't read the footer.
542           Some(new Footer(currentFile.getPath(),
543             ParquetFileReader.readFooter(
544               conf, currentFile, SKIP_ROW_GROUPS)))
545         } catch { case e: RuntimeException =>
546           if (ignoreCorruptFiles) {
547             logWarning(s"Skipped the footer in the corrupted file: $currentFile", e)
548             None
549           } else {
550             throw new IOException(s"Could not read footer for file: $currentFile", e)
551           }
552         }
553       }.seq
554     } finally {
555       pool.shutdown()
556     }
557   }
558
559   /**
560    * Figures out a merged Parquet schema with a distributed Spark job.
561    *
562    * Note that locality is not taken into consideration here because:
563    *
564    *  1. For a single Parquet part-file, in most cases the footer only resides in the last block of
565    *     that file.  Thus we only need to retrieve the location of the last block.  However, Hadoop
566    *     `FileSystem` only provides API to retrieve locations of all blocks, which can be
567    *     potentially expensive.
568    *
569    *  2. This optimization is mainly useful for S3, where file metadata operations can be pretty
570    *     slow.  And basically locality is not available when using S3 (you can't run computation on
571    *     S3 nodes).
572    */
573   def mergeSchemasInParallel(
574       filesToTouch: Seq[FileStatus],
575       sparkSession: SparkSession): Option[StructType] = {
576     val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
577     val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
578     val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())
579
580     // !! HACK ALERT !!
581     //
582     // Parquet requires `FileStatus`es to read footers.  Here we try to send cached `FileStatus`es
583     // to executor side to avoid fetching them again.  However, `FileStatus` is not `Serializable`
584     // but only `Writable`.  What makes it worse, for some reason, `FileStatus` doesn't play well
585     // with `SerializableWritable[T]` and always causes a weird `IllegalStateException`.  These
586     // facts virtually prevents us to serialize `FileStatus`es.
587     //
588     // Since Parquet only relies on path and length information of those `FileStatus`es to read
589     // footers, here we just extract them (which can be easily serialized), send them to executor
590     // side, and resemble fake `FileStatus`es there.
591     val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen))
592
593     // Set the number of partitions to prevent following schema reads from generating many tasks
594     // in case of a small number of parquet files.
595     val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
596       sparkSession.sparkContext.defaultParallelism)
597
598     val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
599
600     // Issues a Spark job to read Parquet schema in parallel.
601     val partiallyMergedSchemas =
602       sparkSession
603         .sparkContext
604         .parallelize(partialFileStatusInfo, numParallelism)
605         .mapPartitions { iterator =>
606           // Resembles fake `FileStatus`es with serialized path and length information.
607           val fakeFileStatuses = iterator.map { case (path, length) =>
608             new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))
609           }.toSeq
610
611           // Reads footers in multi-threaded manner within each task
612           val footers =
613             ParquetFileFormat.readParquetFootersInParallel(
614               serializedConf.value, fakeFileStatuses, ignoreCorruptFiles)
615
616           // Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
617           val converter = new ParquetToSparkSchemaConverter(
618             assumeBinaryIsString = assumeBinaryIsString,
619             assumeInt96IsTimestamp = assumeInt96IsTimestamp)
620           if (footers.isEmpty) {
621             Iterator.empty
622           } else {
623             var mergedSchema = ParquetFileFormat.readSchemaFromFooter(footers.head, converter)
624             footers.tail.foreach { footer =>
625               val schema = ParquetFileFormat.readSchemaFromFooter(footer, converter)
626               try {
627                 mergedSchema = mergedSchema.merge(schema)
628               } catch { case cause: SparkException =>
629                 throw new SparkException(
630                   s"Failed merging schema of file ${footer.getFile}:\n${schema.treeString}", cause)
631               }
632             }
633             Iterator.single(mergedSchema)
634           }
635         }.collect()
636
637     if (partiallyMergedSchemas.isEmpty) {
638       None
639     } else {
640       var finalSchema = partiallyMergedSchemas.head
641       partiallyMergedSchemas.tail.foreach { schema =>
642         try {
643           finalSchema = finalSchema.merge(schema)
644         } catch { case cause: SparkException =>
645           throw new SparkException(
646             s"Failed merging schema:\n${schema.treeString}", cause)
647         }
648       }
649       Some(finalSchema)
650     }
651   }
652
653   /**
654    * Reads Spark SQL schema from a Parquet footer.  If a valid serialized Spark SQL schema string
655    * can be found in the file metadata, returns the deserialized [[StructType]], otherwise, returns
656    * a [[StructType]] converted from the [[MessageType]] stored in this footer.
657    */
658   def readSchemaFromFooter(
659       footer: Footer, converter: ParquetToSparkSchemaConverter): StructType = {
660     val fileMetaData = footer.getParquetMetadata.getFileMetaData
661     fileMetaData
662       .getKeyValueMetaData
663       .asScala.toMap
664       .get(ParquetReadSupport.SPARK_METADATA_KEY)
665       .flatMap(deserializeSchemaString)
666       .getOrElse(converter.convert(fileMetaData.getSchema))
667   }
668
669   private def deserializeSchemaString(schemaString: String): Option[StructType] = {
670     // Tries to deserialize the schema string as JSON first, then falls back to the case class
671     // string parser (data generated by older versions of Spark SQL uses this format).
672     Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover {
673       case _: Throwable =>
674         logInfo(
675           "Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
676             "falling back to the deprecated DataType.fromCaseClassString parser.")
677         LegacyTypeStringParser.parse(schemaString).asInstanceOf[StructType]
678     }.recoverWith {
679       case cause: Throwable =>
680         logWarning(
681           "Failed to parse and ignored serialized Spark schema in " +
682             s"Parquet key-value metadata:\n\t$schemaString", cause)
683         Failure(cause)
684     }.toOption
685   }
686 }