023e12788829063555b9925c467b76b73f81b0d2
[spark.git] / sql / core / src / main / scala / org / apache / spark / sql / execution / datasources / FileFormat.scala
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *    http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package org.apache.spark.sql.execution.datasources
19
20 import org.apache.hadoop.conf.Configuration
21 import org.apache.hadoop.fs._
22 import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
23 import org.apache.hadoop.mapreduce.Job
24
25 import org.apache.spark.sql._
26 import org.apache.spark.sql.catalyst.InternalRow
27 import org.apache.spark.sql.catalyst.expressions._
28 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
29 import org.apache.spark.sql.internal.SQLConf
30 import org.apache.spark.sql.sources.Filter
31 import org.apache.spark.sql.types.StructType
32
33
34 /**
35  * Used to read and write data stored in files to/from the [[InternalRow]] format.
36  */
37 trait FileFormat {
38   /**
39    * When possible, this method should return the schema of the given `files`.  When the format
40    * does not support inference, or no valid files are given should return None.  In these cases
41    * Spark will require that user specify the schema manually.
42    */
43   def inferSchema(
44       sparkSession: SparkSession,
45       options: Map[String, String],
46       files: Seq[FileStatus]): Option[StructType]
47
48   /**
49    * Prepares a write job and returns an [[OutputWriterFactory]].  Client side job preparation can
50    * be put here.  For example, user defined output committer can be configured here
51    * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
52    */
53   def prepareWrite(
54       sparkSession: SparkSession,
55       job: Job,
56       options: Map[String, String],
57       dataSchema: StructType): OutputWriterFactory
58
59   /**
60    * Returns whether this format support returning columnar batch or not.
61    *
62    * TODO: we should just have different traits for the different formats.
63    */
64   def supportBatch(sparkSession: SparkSession, dataSchema: StructType): Boolean = {
65     false
66   }
67
68   /**
69    * Returns concrete column vector class names for each column to be used in a columnar batch
70    * if this format supports returning columnar batch.
71    */
72   def vectorTypes(
73       requiredSchema: StructType,
74       partitionSchema: StructType,
75       sqlConf: SQLConf): Option[Seq[String]] = {
76     None
77   }
78
79   /**
80    * Returns whether a file with `path` could be split or not.
81    */
82   def isSplitable(
83       sparkSession: SparkSession,
84       options: Map[String, String],
85       path: Path): Boolean = {
86     false
87   }
88
89   /**
90    * Returns a function that can be used to read a single file in as an Iterator of InternalRow.
91    *
92    * @param dataSchema The global data schema. It can be either specified by the user, or
93    *                   reconciled/merged from all underlying data files. If any partition columns
94    *                   are contained in the files, they are preserved in this schema.
95    * @param partitionSchema The schema of the partition column row that will be present in each
96    *                        PartitionedFile. These columns should be appended to the rows that
97    *                        are produced by the iterator.
98    * @param requiredSchema The schema of the data that should be output for each row.  This may be a
99    *                       subset of the columns that are present in the file if column pruning has
100    *                       occurred.
101    * @param filters A set of filters than can optionally be used to reduce the number of rows output
102    * @param options A set of string -> string configuration options.
103    * @return
104    */
105   protected def buildReader(
106       sparkSession: SparkSession,
107       dataSchema: StructType,
108       partitionSchema: StructType,
109       requiredSchema: StructType,
110       filters: Seq[Filter],
111       options: Map[String, String],
112       hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
113     throw new UnsupportedOperationException(s"buildReader is not supported for $this")
114   }
115
116   /**
117    * Exactly the same as [[buildReader]] except that the reader function returned by this method
118    * appends partition values to [[InternalRow]]s produced by the reader function [[buildReader]]
119    * returns.
120    */
121   def buildReaderWithPartitionValues(
122       sparkSession: SparkSession,
123       dataSchema: StructType,
124       partitionSchema: StructType,
125       requiredSchema: StructType,
126       filters: Seq[Filter],
127       options: Map[String, String],
128       hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
129     val dataReader = buildReader(
130       sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
131
132     new (PartitionedFile => Iterator[InternalRow]) with Serializable {
133       private val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
134
135       private val joinedRow = new JoinedRow()
136
137       // Using lazy val to avoid serialization
138       private lazy val appendPartitionColumns =
139         GenerateUnsafeProjection.generate(fullSchema, fullSchema)
140
141       override def apply(file: PartitionedFile): Iterator[InternalRow] = {
142         // Using local val to avoid per-row lazy val check (pre-mature optimization?...)
143         val converter = appendPartitionColumns
144
145         // Note that we have to apply the converter even though `file.partitionValues` is empty.
146         // This is because the converter is also responsible for converting safe `InternalRow`s into
147         // `UnsafeRow`s.
148         dataReader(file).map { dataRow =>
149           converter(joinedRow(dataRow, file.partitionValues))
150         }
151       }
152     }
153   }
154
155 }
156
157 /**
158  * The base class file format that is based on text file.
159  */
160 abstract class TextBasedFileFormat extends FileFormat {
161   private var codecFactory: CompressionCodecFactory = _
162
163   override def isSplitable(
164       sparkSession: SparkSession,
165       options: Map[String, String],
166       path: Path): Boolean = {
167     if (codecFactory == null) {
168       codecFactory = new CompressionCodecFactory(
169         sparkSession.sessionState.newHadoopConfWithOptions(options))
170     }
171     val codec = codecFactory.getCodec(path)
172     codec == null || codec.isInstanceOf[SplittableCompressionCodec]
173   }
174 }