52a18abb55241181015eb896b82088b60d6d5619
[spark.git] / sql / core / src / main / scala / org / apache / spark / sql / execution / datasources / parquet / ParquetFileFormat.scala
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *    http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package org.apache.spark.sql.execution.datasources.parquet
19
20 import java.io.IOException
21 import java.net.URI
22
23 import scala.collection.JavaConverters._
24 import scala.collection.mutable
25 import scala.collection.parallel.ForkJoinTaskSupport
26 import scala.util.{Failure, Try}
27
28 import org.apache.hadoop.conf.Configuration
29 import org.apache.hadoop.fs.{FileStatus, Path}
30 import org.apache.hadoop.mapreduce._
31 import org.apache.hadoop.mapreduce.lib.input.FileSplit
32 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
33 import org.apache.parquet.filter2.compat.FilterCompat
34 import org.apache.parquet.filter2.predicate.FilterApi
35 import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
36 import org.apache.parquet.hadoop._
37 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel
38 import org.apache.parquet.hadoop.codec.CodecConfig
39 import org.apache.parquet.hadoop.util.ContextUtil
40 import org.apache.parquet.schema.MessageType
41
42 import org.apache.spark.{SparkException, TaskContext}
43 import org.apache.spark.internal.Logging
44 import org.apache.spark.sql._
45 import org.apache.spark.sql.catalyst.InternalRow
46 import org.apache.spark.sql.catalyst.expressions._
47 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
48 import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
49 import org.apache.spark.sql.catalyst.util.DateTimeUtils
50 import org.apache.spark.sql.execution.datasources._
51 import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
52 import org.apache.spark.sql.internal.SQLConf
53 import org.apache.spark.sql.sources._
54 import org.apache.spark.sql.types._
55 import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
56
57 class ParquetFileFormat
58   extends FileFormat
59   with DataSourceRegister
60   with Logging
61   with Serializable {
62   // Hold a reference to the (serializable) singleton instance of ParquetLogRedirector. This
63   // ensures the ParquetLogRedirector class is initialized whether an instance of ParquetFileFormat
64   // is constructed or deserialized. Do not heed the Scala compiler's warning about an unused field
65   // here.
66   private val parquetLogRedirector = ParquetLogRedirector.INSTANCE
67
68   override def shortName(): String = "parquet"
69
70   override def toString: String = "Parquet"
71
72   override def hashCode(): Int = getClass.hashCode()
73
74   override def equals(other: Any): Boolean = other.isInstanceOf[ParquetFileFormat]
75
76   override def prepareWrite(
77       sparkSession: SparkSession,
78       job: Job,
79       options: Map[String, String],
80       dataSchema: StructType): OutputWriterFactory = {
81     DataSourceUtils.verifyWriteSchema(this, dataSchema)
82
83     val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
84
85     val conf = ContextUtil.getConfiguration(job)
86
87     val committerClass =
88       conf.getClass(
89         SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
90         classOf[ParquetOutputCommitter],
91         classOf[OutputCommitter])
92
93     if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
94       logInfo("Using default output committer for Parquet: " +
95         classOf[ParquetOutputCommitter].getCanonicalName)
96     } else {
97       logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName)
98     }
99
100     conf.setClass(
101       SQLConf.OUTPUT_COMMITTER_CLASS.key,
102       committerClass,
103       classOf[OutputCommitter])
104
105     // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override
106     // it in `ParquetOutputWriter` to support appending and dynamic partitioning.  The reason why
107     // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is
108     // bundled with `ParquetOutputFormat[Row]`.
109     job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
110
111     ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
112
113     // This metadata is useful for keeping UDTs like Vector/Matrix.
114     ParquetWriteSupport.setSchema(dataSchema, conf)
115
116     // Sets flags for `ParquetWriteSupport`, which converts Catalyst schema to Parquet
117     // schema and writes actual rows to Parquet files.
118     conf.set(
119       SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
120       sparkSession.sessionState.conf.writeLegacyParquetFormat.toString)
121
122     conf.set(
123       SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
124       sparkSession.sessionState.conf.parquetOutputTimestampType.toString)
125
126     // Sets compression scheme
127     conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
128
129     // SPARK-15719: Disables writing Parquet summary files by default.
130     if (conf.get(ParquetOutputFormat.JOB_SUMMARY_LEVEL) == null
131       && conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null) {
132       conf.setEnum(ParquetOutputFormat.JOB_SUMMARY_LEVEL, JobSummaryLevel.NONE)
133     }
134
135     if (ParquetOutputFormat.getJobSummaryLevel(conf) == JobSummaryLevel.NONE
136       && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) {
137       // output summary is requested, but the class is not a Parquet Committer
138       logWarning(s"Committer $committerClass is not a ParquetOutputCommitter and cannot" +
139         s" create job summaries. " +
140         s"Set Parquet option ${ParquetOutputFormat.JOB_SUMMARY_LEVEL} to NONE.")
141     }
142
143     new OutputWriterFactory {
144       // This OutputWriterFactory instance is deserialized when writing Parquet files on the
145       // executor side without constructing or deserializing ParquetFileFormat. Therefore, we hold
146       // another reference to ParquetLogRedirector.INSTANCE here to ensure the latter class is
147       // initialized.
148       private val parquetLogRedirector = ParquetLogRedirector.INSTANCE
149
150         override def newInstance(
151           path: String,
152           dataSchema: StructType,
153           context: TaskAttemptContext): OutputWriter = {
154         new ParquetOutputWriter(path, context)
155       }
156
157       override def getFileExtension(context: TaskAttemptContext): String = {
158         CodecConfig.from(context).getCodec.getExtension + ".parquet"
159       }
160     }
161   }
162
163   override def inferSchema(
164       sparkSession: SparkSession,
165       parameters: Map[String, String],
166       files: Seq[FileStatus]): Option[StructType] = {
167     val parquetOptions = new ParquetOptions(parameters, sparkSession.sessionState.conf)
168
169     // Should we merge schemas from all Parquet part-files?
170     val shouldMergeSchemas = parquetOptions.mergeSchema
171
172     val mergeRespectSummaries = sparkSession.sessionState.conf.isParquetSchemaRespectSummaries
173
174     val filesByType = splitFiles(files)
175
176     // Sees which file(s) we need to touch in order to figure out the schema.
177     //
178     // Always tries the summary files first if users don't require a merged schema.  In this case,
179     // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
180     // groups information, and could be much smaller for large Parquet files with lots of row
181     // groups.  If no summary file is available, falls back to some random part-file.
182     //
183     // NOTE: Metadata stored in the summary files are merged from all part-files.  However, for
184     // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
185     // how to merge them correctly if some key is associated with different values in different
186     // part-files.  When this happens, Parquet simply gives up generating the summary file.  This
187     // implies that if a summary file presents, then:
188     //
189     //   1. Either all part-files have exactly the same Spark SQL schema, or
190     //   2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
191     //      their schemas may differ from each other).
192     //
193     // Here we tend to be pessimistic and take the second case into account.  Basically this means
194     // we can't trust the summary files if users require a merged schema, and must touch all part-
195     // files to do the merge.
196     val filesToTouch =
197       if (shouldMergeSchemas) {
198         // Also includes summary files, 'cause there might be empty partition directories.
199
200         // If mergeRespectSummaries config is true, we assume that all part-files are the same for
201         // their schema with summary files, so we ignore them when merging schema.
202         // If the config is disabled, which is the default setting, we merge all part-files.
203         // In this mode, we only need to merge schemas contained in all those summary files.
204         // You should enable this configuration only if you are very sure that for the parquet
205         // part-files to read there are corresponding summary files containing correct schema.
206
207         // As filed in SPARK-11500, the order of files to touch is a matter, which might affect
208         // the ordering of the output columns. There are several things to mention here.
209         //
210         //  1. If mergeRespectSummaries config is false, then it merges schemas by reducing from
211         //     the first part-file so that the columns of the lexicographically first file show
212         //     first.
213         //
214         //  2. If mergeRespectSummaries config is true, then there should be, at least,
215         //     "_metadata"s for all given files, so that we can ensure the columns of
216         //     the lexicographically first file show first.
217         //
218         //  3. If shouldMergeSchemas is false, but when multiple files are given, there is
219         //     no guarantee of the output order, since there might not be a summary file for the
220         //     lexicographically first file, which ends up putting ahead the columns of
221         //     the other files. However, this should be okay since not enabling
222         //     shouldMergeSchemas means (assumes) all the files have the same schemas.
223
224         val needMerged: Seq[FileStatus] =
225           if (mergeRespectSummaries) {
226             Seq.empty
227           } else {
228             filesByType.data
229           }
230         needMerged ++ filesByType.metadata ++ filesByType.commonMetadata
231       } else {
232         // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
233         // don't have this.
234         filesByType.commonMetadata.headOption
235             // Falls back to "_metadata"
236             .orElse(filesByType.metadata.headOption)
237             // Summary file(s) not found, the Parquet file is either corrupted, or different part-
238             // files contain conflicting user defined metadata (two or more values are associated
239             // with a same key in different files).  In either case, we fall back to any of the
240             // first part-file, and just assume all schemas are consistent.
241             .orElse(filesByType.data.headOption)
242             .toSeq
243       }
244     ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)
245   }
246
247   case class FileTypes(
248       data: Seq[FileStatus],
249       metadata: Seq[FileStatus],
250       commonMetadata: Seq[FileStatus])
251
252   private def splitFiles(allFiles: Seq[FileStatus]): FileTypes = {
253     val leaves = allFiles.toArray.sortBy(_.getPath.toString)
254
255     FileTypes(
256       data = leaves.filterNot(f => isSummaryFile(f.getPath)),
257       metadata =
258         leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE),
259       commonMetadata =
260         leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE))
261   }
262
263   private def isSummaryFile(file: Path): Boolean = {
264     file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
265         file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
266   }
267
268   /**
269    * Returns whether the reader will return the rows as batch or not.
270    */
271   override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
272     val conf = sparkSession.sessionState.conf
273     conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled &&
274       schema.length <= conf.wholeStageMaxNumFields &&
275       schema.forall(_.dataType.isInstanceOf[AtomicType])
276   }
277
278   override def vectorTypes(
279       requiredSchema: StructType,
280       partitionSchema: StructType,
281       sqlConf: SQLConf): Option[Seq[String]] = {
282     Option(Seq.fill(requiredSchema.fields.length + partitionSchema.fields.length)(
283       if (!sqlConf.offHeapColumnVectorEnabled) {
284         classOf[OnHeapColumnVector].getName
285       } else {
286         classOf[OffHeapColumnVector].getName
287       }
288     ))
289   }
290
291   override def isSplitable(
292       sparkSession: SparkSession,
293       options: Map[String, String],
294       path: Path): Boolean = {
295     true
296   }
297
298   override def buildReaderWithPartitionValues(
299       sparkSession: SparkSession,
300       dataSchema: StructType,
301       partitionSchema: StructType,
302       requiredSchema: StructType,
303       filters: Seq[Filter],
304       options: Map[String, String],
305       hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
306     DataSourceUtils.verifyReadSchema(this, dataSchema)
307
308     hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
309     hadoopConf.set(
310       ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
311       requiredSchema.json)
312     hadoopConf.set(
313       ParquetWriteSupport.SPARK_ROW_SCHEMA,
314       requiredSchema.json)
315     hadoopConf.set(
316       SQLConf.SESSION_LOCAL_TIMEZONE.key,
317       sparkSession.sessionState.conf.sessionLocalTimeZone)
318
319     ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
320
321     // Sets flags for `ParquetToSparkSchemaConverter`
322     hadoopConf.setBoolean(
323       SQLConf.PARQUET_BINARY_AS_STRING.key,
324       sparkSession.sessionState.conf.isParquetBinaryAsString)
325     hadoopConf.setBoolean(
326       SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
327       sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
328
329     val broadcastedHadoopConf =
330       sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
331
332     // TODO: if you move this into the closure it reverts to the default values.
333     // If true, enable using the custom RecordReader for parquet. This only works for
334     // a subset of the types (no complex types).
335     val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
336     val sqlConf = sparkSession.sessionState.conf
337     val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
338     val enableVectorizedReader: Boolean =
339       sqlConf.parquetVectorizedReaderEnabled &&
340       resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
341     val enableRecordFilter: Boolean =
342       sparkSession.sessionState.conf.parquetRecordFilterEnabled
343     val timestampConversion: Boolean =
344       sparkSession.sessionState.conf.isParquetINT96TimestampConversion
345     val capacity = sqlConf.parquetVectorizedReaderBatchSize
346     val enableParquetFilterPushDown: Boolean =
347       sparkSession.sessionState.conf.parquetFilterPushDown
348     // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
349     val returningBatch = supportBatch(sparkSession, resultSchema)
350     val pushDownDate = sqlConf.parquetFilterPushDownDate
351     val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
352
353     (file: PartitionedFile) => {
354       assert(file.partitionValues.numFields == partitionSchema.size)
355
356       val fileSplit =
357         new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
358       val filePath = fileSplit.getPath
359
360       val split =
361         new org.apache.parquet.hadoop.ParquetInputSplit(
362           filePath,
363           fileSplit.getStart,
364           fileSplit.getStart + fileSplit.getLength,
365           fileSplit.getLength,
366           fileSplit.getLocations,
367           null)
368
369       val sharedConf = broadcastedHadoopConf.value.value
370
371       // Try to push down filters when filter push-down is enabled.
372       val pushed = if (enableParquetFilterPushDown) {
373         val parquetSchema = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS)
374           .getFileMetaData.getSchema
375         filters
376           // Collects all converted Parquet filter predicates. Notice that not all predicates can be
377           // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
378           // is used here.
379           .flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
380           .createFilter(parquetSchema, _))
381           .reduceOption(FilterApi.and)
382       } else {
383         None
384       }
385
386       // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
387       // *only* if the file was created by something other than "parquet-mr", so check the actual
388       // writer here for this file.  We have to do this per-file, as each file in the table may
389       // have different writers.
390       def isCreatedByParquetMr(): Boolean = {
391         val footer = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS)
392         footer.getFileMetaData().getCreatedBy().startsWith("parquet-mr")
393       }
394       val convertTz =
395         if (timestampConversion && !isCreatedByParquetMr()) {
396           Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
397         } else {
398           None
399         }
400
401       val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
402       val hadoopAttemptContext =
403         new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
404
405       // Try to push down filters when filter push-down is enabled.
406       // Notice: This push-down is RowGroups level, not individual records.
407       if (pushed.isDefined) {
408         ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
409       }
410       val taskContext = Option(TaskContext.get())
411       if (enableVectorizedReader) {
412         val vectorizedReader = new VectorizedParquetRecordReader(
413           convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
414         val iter = new RecordReaderIterator(vectorizedReader)
415         // SPARK-23457 Register a task completion lister before `initialization`.
416         taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
417         vectorizedReader.initialize(split, hadoopAttemptContext)
418         logDebug(s"Appending $partitionSchema ${file.partitionValues}")
419         vectorizedReader.initBatch(partitionSchema, file.partitionValues)
420         if (returningBatch) {
421           vectorizedReader.enableReturningBatches()
422         }
423
424         // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
425         iter.asInstanceOf[Iterator[InternalRow]]
426       } else {
427         logDebug(s"Falling back to parquet-mr")
428         // ParquetRecordReader returns UnsafeRow
429         val reader = if (pushed.isDefined && enableRecordFilter) {
430           val parquetFilter = FilterCompat.get(pushed.get, null)
431           new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter)
432         } else {
433           new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
434         }
435         val iter = new RecordReaderIterator(reader)
436         // SPARK-23457 Register a task completion lister before `initialization`.
437         taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
438         reader.initialize(split, hadoopAttemptContext)
439
440         val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
441         val joinedRow = new JoinedRow()
442         val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
443
444         // This is a horrible erasure hack...  if we type the iterator above, then it actually check
445         // the type in next() and we get a class cast exception.  If we make that function return
446         // Object, then we can defer the cast until later!
447         if (partitionSchema.length == 0) {
448           // There is no partition columns
449           iter.asInstanceOf[Iterator[InternalRow]]
450         } else {
451           iter.asInstanceOf[Iterator[InternalRow]]
452             .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
453         }
454       }
455     }
456   }
457 }
458
459 object ParquetFileFormat extends Logging {
460   private[parquet] def readSchema(
461       footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {
462
463     val converter = new ParquetToSparkSchemaConverter(
464       sparkSession.sessionState.conf.isParquetBinaryAsString,
465       sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
466
467     val seen = mutable.HashSet[String]()
468     val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
469       val metadata = footer.getParquetMetadata.getFileMetaData
470       val serializedSchema = metadata
471         .getKeyValueMetaData
472         .asScala.toMap
473         .get(ParquetReadSupport.SPARK_METADATA_KEY)
474       if (serializedSchema.isEmpty) {
475         // Falls back to Parquet schema if no Spark SQL schema found.
476         Some(converter.convert(metadata.getSchema))
477       } else if (!seen.contains(serializedSchema.get)) {
478         seen += serializedSchema.get
479
480         // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
481         // whatever is available.
482         Some(Try(DataType.fromJson(serializedSchema.get))
483           .recover { case _: Throwable =>
484             logInfo(
485               "Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
486                 "falling back to the deprecated DataType.fromCaseClassString parser.")
487             LegacyTypeStringParser.parse(serializedSchema.get)
488           }
489           .recover { case cause: Throwable =>
490             logWarning(
491               s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
492                  |\t$serializedSchema
493                """.stripMargin,
494               cause)
495           }
496           .map(_.asInstanceOf[StructType])
497           .getOrElse {
498             // Falls back to Parquet schema if Spark SQL schema can't be parsed.
499             converter.convert(metadata.getSchema)
500           })
501       } else {
502         None
503       }
504     }
505
506     finalSchemas.reduceOption { (left, right) =>
507       try left.merge(right) catch { case e: Throwable =>
508         throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
509       }
510     }
511   }
512
513   /**
514    * Reads Parquet footers in multi-threaded manner.
515    * If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted
516    * files when reading footers.
517    */
518   private[parquet] def readParquetFootersInParallel(
519       conf: Configuration,
520       partFiles: Seq[FileStatus],
521       ignoreCorruptFiles: Boolean): Seq[Footer] = {
522     val parFiles = partFiles.par
523     val pool = ThreadUtils.newForkJoinPool("readingParquetFooters", 8)
524     parFiles.tasksupport = new ForkJoinTaskSupport(pool)
525     try {
526       parFiles.flatMap { currentFile =>
527         try {
528           // Skips row group information since we only need the schema.
529           // ParquetFileReader.readFooter throws RuntimeException, instead of IOException,
530           // when it can't read the footer.
531           Some(new Footer(currentFile.getPath(),
532             ParquetFileReader.readFooter(
533               conf, currentFile, SKIP_ROW_GROUPS)))
534         } catch { case e: RuntimeException =>
535           if (ignoreCorruptFiles) {
536             logWarning(s"Skipped the footer in the corrupted file: $currentFile", e)
537             None
538           } else {
539             throw new IOException(s"Could not read footer for file: $currentFile", e)
540           }
541         }
542       }.seq
543     } finally {
544       pool.shutdown()
545     }
546   }
547
548   /**
549    * Figures out a merged Parquet schema with a distributed Spark job.
550    *
551    * Note that locality is not taken into consideration here because:
552    *
553    *  1. For a single Parquet part-file, in most cases the footer only resides in the last block of
554    *     that file.  Thus we only need to retrieve the location of the last block.  However, Hadoop
555    *     `FileSystem` only provides API to retrieve locations of all blocks, which can be
556    *     potentially expensive.
557    *
558    *  2. This optimization is mainly useful for S3, where file metadata operations can be pretty
559    *     slow.  And basically locality is not available when using S3 (you can't run computation on
560    *     S3 nodes).
561    */
562   def mergeSchemasInParallel(
563       filesToTouch: Seq[FileStatus],
564       sparkSession: SparkSession): Option[StructType] = {
565     val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
566     val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
567     val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())
568
569     // !! HACK ALERT !!
570     //
571     // Parquet requires `FileStatus`es to read footers.  Here we try to send cached `FileStatus`es
572     // to executor side to avoid fetching them again.  However, `FileStatus` is not `Serializable`
573     // but only `Writable`.  What makes it worse, for some reason, `FileStatus` doesn't play well
574     // with `SerializableWritable[T]` and always causes a weird `IllegalStateException`.  These
575     // facts virtually prevents us to serialize `FileStatus`es.
576     //
577     // Since Parquet only relies on path and length information of those `FileStatus`es to read
578     // footers, here we just extract them (which can be easily serialized), send them to executor
579     // side, and resemble fake `FileStatus`es there.
580     val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen))
581
582     // Set the number of partitions to prevent following schema reads from generating many tasks
583     // in case of a small number of parquet files.
584     val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
585       sparkSession.sparkContext.defaultParallelism)
586
587     val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
588
589     // Issues a Spark job to read Parquet schema in parallel.
590     val partiallyMergedSchemas =
591       sparkSession
592         .sparkContext
593         .parallelize(partialFileStatusInfo, numParallelism)
594         .mapPartitions { iterator =>
595           // Resembles fake `FileStatus`es with serialized path and length information.
596           val fakeFileStatuses = iterator.map { case (path, length) =>
597             new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))
598           }.toSeq
599
600           // Reads footers in multi-threaded manner within each task
601           val footers =
602             ParquetFileFormat.readParquetFootersInParallel(
603               serializedConf.value, fakeFileStatuses, ignoreCorruptFiles)
604
605           // Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
606           val converter = new ParquetToSparkSchemaConverter(
607             assumeBinaryIsString = assumeBinaryIsString,
608             assumeInt96IsTimestamp = assumeInt96IsTimestamp)
609           if (footers.isEmpty) {
610             Iterator.empty
611           } else {
612             var mergedSchema = ParquetFileFormat.readSchemaFromFooter(footers.head, converter)
613             footers.tail.foreach { footer =>
614               val schema = ParquetFileFormat.readSchemaFromFooter(footer, converter)
615               try {
616                 mergedSchema = mergedSchema.merge(schema)
617               } catch { case cause: SparkException =>
618                 throw new SparkException(
619                   s"Failed merging schema of file ${footer.getFile}:\n${schema.treeString}", cause)
620               }
621             }
622             Iterator.single(mergedSchema)
623           }
624         }.collect()
625
626     if (partiallyMergedSchemas.isEmpty) {
627       None
628     } else {
629       var finalSchema = partiallyMergedSchemas.head
630       partiallyMergedSchemas.tail.foreach { schema =>
631         try {
632           finalSchema = finalSchema.merge(schema)
633         } catch { case cause: SparkException =>
634           throw new SparkException(
635             s"Failed merging schema:\n${schema.treeString}", cause)
636         }
637       }
638       Some(finalSchema)
639     }
640   }
641
642   /**
643    * Reads Spark SQL schema from a Parquet footer.  If a valid serialized Spark SQL schema string
644    * can be found in the file metadata, returns the deserialized [[StructType]], otherwise, returns
645    * a [[StructType]] converted from the [[MessageType]] stored in this footer.
646    */
647   def readSchemaFromFooter(
648       footer: Footer, converter: ParquetToSparkSchemaConverter): StructType = {
649     val fileMetaData = footer.getParquetMetadata.getFileMetaData
650     fileMetaData
651       .getKeyValueMetaData
652       .asScala.toMap
653       .get(ParquetReadSupport.SPARK_METADATA_KEY)
654       .flatMap(deserializeSchemaString)
655       .getOrElse(converter.convert(fileMetaData.getSchema))
656   }
657
658   private def deserializeSchemaString(schemaString: String): Option[StructType] = {
659     // Tries to deserialize the schema string as JSON first, then falls back to the case class
660     // string parser (data generated by older versions of Spark SQL uses this format).
661     Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover {
662       case _: Throwable =>
663         logInfo(
664           "Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
665             "falling back to the deprecated DataType.fromCaseClassString parser.")
666         LegacyTypeStringParser.parse(schemaString).asInstanceOf[StructType]
667     }.recoverWith {
668       case cause: Throwable =>
669         logWarning(
670           "Failed to parse and ignored serialized Spark schema in " +
671             s"Parquet key-value metadata:\n\t$schemaString", cause)
672         Failure(cause)
673     }.toOption
674   }
675 }