[SPARK-24768][SQL] Have a built-in AVRO data source implementation
authorGengliang Wang <gengliang.wang@databricks.com>
Thu, 12 Jul 2018 20:55:25 +0000 (13:55 -0700)
committerXiao Li <gatorsmile@gmail.com>
Thu, 12 Jul 2018 20:55:25 +0000 (13:55 -0700)
## What changes were proposed in this pull request?

Apache Avro (https://avro.apache.org) is a popular data serialization format. It is widely used in the Spark and Hadoop ecosystem, especially for Kafka-based data pipelines.  Using the external package https://github.com/databricks/spark-avro, Spark SQL can read and write the avro data. Making spark-Avro built-in can provide a better experience for first-time users of Spark SQL and structured streaming. We expect the built-in Avro data source can further improve the adoption of structured streaming.
The proposal is to inline code from spark-avro package (https://github.com/databricks/spark-avro). The target release is Spark 2.4.

[Built-in AVRO Data Source In Spark 2.4.pdf](https://github.com/apache/spark/files/2181511/Built-in.AVRO.Data.Source.In.Spark.2.4.pdf)

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21742 from gengliangwang/export_avro.

30 files changed:
dev/run-tests.py
dev/sparktestsupport/modules.py
external/avro/pom.xml [new file with mode: 0644]
external/avro/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister [new file with mode: 0644]
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala [new file with mode: 0755]
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala [new file with mode: 0644]
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala [new file with mode: 0644]
external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala [new file with mode: 0644]
external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala [new file with mode: 0755]
external/avro/src/test/resources/episodes.avro [new file with mode: 0644]
external/avro/src/test/resources/log4j.properties [new file with mode: 0644]
external/avro/src/test/resources/test-random-partitioned/part-r-00000.avro [new file with mode: 0755]
external/avro/src/test/resources/test-random-partitioned/part-r-00001.avro [new file with mode: 0755]
external/avro/src/test/resources/test-random-partitioned/part-r-00002.avro [new file with mode: 0755]
external/avro/src/test/resources/test-random-partitioned/part-r-00003.avro [new file with mode: 0755]
external/avro/src/test/resources/test-random-partitioned/part-r-00004.avro [new file with mode: 0755]
external/avro/src/test/resources/test-random-partitioned/part-r-00005.avro [new file with mode: 0755]
external/avro/src/test/resources/test-random-partitioned/part-r-00006.avro [new file with mode: 0755]
external/avro/src/test/resources/test-random-partitioned/part-r-00007.avro [new file with mode: 0755]
external/avro/src/test/resources/test-random-partitioned/part-r-00008.avro [new file with mode: 0755]
external/avro/src/test/resources/test-random-partitioned/part-r-00009.avro [new file with mode: 0755]
external/avro/src/test/resources/test-random-partitioned/part-r-00010.avro [new file with mode: 0755]
external/avro/src/test/resources/test.avro [new file with mode: 0644]
external/avro/src/test/resources/test.avsc [new file with mode: 0644]
external/avro/src/test/resources/test.json [new file with mode: 0644]
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala [new file with mode: 0644]
external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableConfigurationSuite.scala [new file with mode: 0755]
external/avro/src/test/scala/org/apache/spark/sql/avro/TestUtils.scala [new file with mode: 0755]
pom.xml
project/SparkBuild.scala

index cd45908..d9d3789 100755 (executable)
@@ -110,7 +110,7 @@ def determine_modules_to_test(changed_modules):
     ['graphx', 'examples']
     >>> x = [x.name for x in determine_modules_to_test([modules.sql])]
     >>> x # doctest: +NORMALIZE_WHITESPACE
-    ['sql', 'hive', 'mllib', 'sql-kafka-0-10', 'examples', 'hive-thriftserver',
+    ['sql', 'avro', 'hive', 'mllib', 'sql-kafka-0-10', 'examples', 'hive-thriftserver',
      'pyspark-sql', 'repl', 'sparkr', 'pyspark-mllib', 'pyspark-ml']
     """
     modules_to_test = set()
index dfea762..2aa3555 100644 (file)
@@ -170,6 +170,16 @@ hive_thriftserver = Module(
     ]
 )
 
+avro = Module(
+    name="avro",
+    dependencies=[sql],
+    source_file_regexes=[
+        "external/avro",
+    ],
+    sbt_test_goals=[
+        "avro/test",
+    ]
+)
 
 sql_kafka = Module(
     name="sql-kafka-0-10",
diff --git a/external/avro/pom.xml b/external/avro/pom.xml
new file mode 100644 (file)
index 0000000..42e865b
--- /dev/null
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.4.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>spark-sql-avro_2.11</artifactId>
+  <properties>
+    <sbt.project.name>avro</sbt.project.name>
+  </properties>
+  <packaging>jar</packaging>
+  <name>Spark Avro</name>
+  <url>http://spark.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-tags_${scala.binary.version}</artifactId>
+    </dependency>
+  </dependencies>
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+</project>
diff --git a/external/avro/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/external/avro/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644 (file)
index 0000000..95835f0
--- /dev/null
@@ -0,0 +1 @@
+org.apache.spark.sql.avro.AvroFileFormat
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
new file mode 100755 (executable)
index 0000000..46e5a18
--- /dev/null
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import java.io._
+import java.net.URI
+import java.util.zip.Deflater
+
+import scala.util.control.NonFatal
+
+import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
+import com.esotericsoftware.kryo.io.{Input, Output}
+import org.apache.avro.{Schema, SchemaBuilder}
+import org.apache.avro.file.{DataFileConstants, DataFileReader}
+import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
+import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
+import org.apache.avro.mapreduce.AvroJob
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.Job
+import org.slf4j.LoggerFactory
+
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.types.StructType
+
+private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
+  private val log = LoggerFactory.getLogger(getClass)
+
+  override def equals(other: Any): Boolean = other match {
+    case _: AvroFileFormat => true
+    case _ => false
+  }
+
+  // Dummy hashCode() to appease ScalaStyle.
+  override def hashCode(): Int = super.hashCode()
+
+  override def inferSchema(
+      spark: SparkSession,
+      options: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType] = {
+    val conf = spark.sparkContext.hadoopConfiguration
+
+    // Schema evolution is not supported yet. Here we only pick a single random sample file to
+    // figure out the schema of the whole dataset.
+    val sampleFile =
+      if (conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true)) {
+        files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
+          throw new FileNotFoundException(
+            "No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" " +
+              " is set to true. Do all input files have \".avro\" extension?"
+          )
+        }
+      } else {
+        files.headOption.getOrElse {
+          throw new FileNotFoundException("No Avro files found.")
+        }
+      }
+
+    // User can specify an optional avro json schema.
+    val avroSchema = options.get(AvroFileFormat.AvroSchema)
+      .map(new Schema.Parser().parse)
+      .getOrElse {
+        val in = new FsInput(sampleFile.getPath, conf)
+        try {
+          val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]())
+          try {
+            reader.getSchema
+          } finally {
+            reader.close()
+          }
+        } finally {
+          in.close()
+        }
+      }
+
+    SchemaConverters.toSqlType(avroSchema).dataType match {
+      case t: StructType => Some(t)
+      case _ => throw new RuntimeException(
+        s"""Avro schema cannot be converted to a Spark SQL StructType:
+           |
+           |${avroSchema.toString(true)}
+           |""".stripMargin)
+    }
+  }
+
+  override def shortName(): String = "avro"
+
+  override def isSplitable(
+      sparkSession: SparkSession,
+      options: Map[String, String],
+      path: Path): Boolean = true
+
+  override def prepareWrite(
+      spark: SparkSession,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = {
+    val recordName = options.getOrElse("recordName", "topLevelRecord")
+    val recordNamespace = options.getOrElse("recordNamespace", "")
+    val build = SchemaBuilder.record(recordName).namespace(recordNamespace)
+    val outputAvroSchema = SchemaConverters.convertStructToAvro(dataSchema, build, recordNamespace)
+
+    AvroJob.setOutputKeySchema(job, outputAvroSchema)
+    val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec"
+    val AVRO_DEFLATE_LEVEL = "spark.sql.avro.deflate.level"
+    val COMPRESS_KEY = "mapred.output.compress"
+
+    spark.conf.get(AVRO_COMPRESSION_CODEC, "snappy") match {
+      case "uncompressed" =>
+        log.info("writing uncompressed Avro records")
+        job.getConfiguration.setBoolean(COMPRESS_KEY, false)
+
+      case "snappy" =>
+        log.info("compressing Avro output using Snappy")
+        job.getConfiguration.setBoolean(COMPRESS_KEY, true)
+        job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC)
+
+      case "deflate" =>
+        val deflateLevel = spark.conf.get(
+          AVRO_DEFLATE_LEVEL, Deflater.DEFAULT_COMPRESSION.toString).toInt
+        log.info(s"compressing Avro output using deflate (level=$deflateLevel)")
+        job.getConfiguration.setBoolean(COMPRESS_KEY, true)
+        job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC)
+        job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
+
+      case unknown: String =>
+        log.error(s"unsupported compression codec $unknown")
+    }
+
+    new AvroOutputWriterFactory(dataSchema, recordName, recordNamespace)
+  }
+
+  override def buildReader(
+      spark: SparkSession,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      requiredSchema: StructType,
+      filters: Seq[Filter],
+      options: Map[String, String],
+      hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
+
+    val broadcastedConf =
+      spark.sparkContext.broadcast(new AvroFileFormat.SerializableConfiguration(hadoopConf))
+
+    (file: PartitionedFile) => {
+      val log = LoggerFactory.getLogger(classOf[AvroFileFormat])
+      val conf = broadcastedConf.value.value
+      val userProvidedSchema = options.get(AvroFileFormat.AvroSchema).map(new Schema.Parser().parse)
+
+      // TODO Removes this check once `FileFormat` gets a general file filtering interface method.
+      // Doing input file filtering is improper because we may generate empty tasks that process no
+      // input files but stress the scheduler. We should probably add a more general input file
+      // filtering mechanism for `FileFormat` data sources. See SPARK-16317.
+      if (
+        conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true) &&
+        !file.filePath.endsWith(".avro")
+      ) {
+        Iterator.empty
+      } else {
+        val reader = {
+          val in = new FsInput(new Path(new URI(file.filePath)), conf)
+          try {
+            val datumReader = userProvidedSchema match {
+              case Some(userSchema) => new GenericDatumReader[GenericRecord](userSchema)
+              case _ => new GenericDatumReader[GenericRecord]()
+            }
+            DataFileReader.openReader(in, datumReader)
+          } catch {
+            case NonFatal(e) =>
+              log.error("Exception while opening DataFileReader", e)
+              in.close()
+              throw e
+          }
+        }
+
+        // Ensure that the reader is closed even if the task fails or doesn't consume the entire
+        // iterator of records.
+        Option(TaskContext.get()).foreach { taskContext =>
+          taskContext.addTaskCompletionListener { _ =>
+            reader.close()
+          }
+        }
+
+        reader.sync(file.start)
+        val stop = file.start + file.length
+
+        val rowConverter = SchemaConverters.createConverterToSQL(
+          userProvidedSchema.getOrElse(reader.getSchema), requiredSchema)
+
+        new Iterator[InternalRow] {
+          // Used to convert `Row`s containing data columns into `InternalRow`s.
+          private val encoderForDataColumns = RowEncoder(requiredSchema)
+
+          private[this] var completed = false
+
+          override def hasNext: Boolean = {
+            if (completed) {
+              false
+            } else {
+              val r = reader.hasNext && !reader.pastSync(stop)
+              if (!r) {
+                reader.close()
+                completed = true
+              }
+              r
+            }
+          }
+
+          override def next(): InternalRow = {
+            if (reader.pastSync(stop)) {
+              throw new NoSuchElementException("next on empty iterator")
+            }
+            val record = reader.next()
+            val safeDataRow = rowConverter(record).asInstanceOf[GenericRow]
+
+            // The safeDataRow is reused, we must do a copy
+            encoderForDataColumns.toRow(safeDataRow)
+          }
+        }
+      }
+    }
+  }
+}
+
+private[avro] object AvroFileFormat {
+  val IgnoreFilesWithoutExtensionProperty = "avro.mapred.ignore.inputs.without.extension"
+
+  val AvroSchema = "avroSchema"
+
+  class SerializableConfiguration(@transient var value: Configuration)
+      extends Serializable with KryoSerializable {
+    @transient private[avro] lazy val log = LoggerFactory.getLogger(getClass)
+
+    private def writeObject(out: ObjectOutputStream): Unit = tryOrIOException {
+      out.defaultWriteObject()
+      value.write(out)
+    }
+
+    private def readObject(in: ObjectInputStream): Unit = tryOrIOException {
+      value = new Configuration(false)
+      value.readFields(in)
+    }
+
+    private def tryOrIOException[T](block: => T): T = {
+      try {
+        block
+      } catch {
+        case e: IOException =>
+          log.error("Exception encountered", e)
+          throw e
+        case NonFatal(e) =>
+          log.error("Exception encountered", e)
+          throw new IOException(e)
+      }
+    }
+
+    def write(kryo: Kryo, out: Output): Unit = {
+      val dos = new DataOutputStream(out)
+      value.write(dos)
+      dos.flush()
+    }
+
+    def read(kryo: Kryo, in: Input): Unit = {
+      value = new Configuration(false)
+      value.readFields(new DataInputStream(in))
+    }
+  }
+}
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
new file mode 100644 (file)
index 0000000..830bf3c
--- /dev/null
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import java.io.{IOException, OutputStream}
+import java.nio.ByteBuffer
+import java.sql.{Date, Timestamp}
+import java.util.HashMap
+
+import scala.collection.immutable.Map
+
+import org.apache.avro.{Schema, SchemaBuilder}
+import org.apache.avro.generic.GenericData.Record
+import org.apache.avro.generic.GenericRecord
+import org.apache.avro.mapred.AvroKey
+import org.apache.avro.mapreduce.AvroKeyOutputFormat
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.execution.datasources.OutputWriter
+import org.apache.spark.sql.types._
+
+// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
+private[avro] class AvroOutputWriter(
+    path: String,
+    context: TaskAttemptContext,
+    schema: StructType,
+    recordName: String,
+    recordNamespace: String) extends OutputWriter {
+
+  private lazy val converter = createConverterToAvro(schema, recordName, recordNamespace)
+  // copy of the old conversion logic after api change in SPARK-19085
+  private lazy val internalRowConverter =
+    CatalystTypeConverters.createToScalaConverter(schema).asInstanceOf[InternalRow => Row]
+
+  /**
+   * Overrides the couple of methods responsible for generating the output streams / files so
+   * that the data can be correctly partitioned
+   */
+  private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] =
+    new AvroKeyOutputFormat[GenericRecord]() {
+
+      override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+        new Path(path)
+      }
+
+      @throws(classOf[IOException])
+      override def getAvroFileOutputStream(c: TaskAttemptContext): OutputStream = {
+        val path = getDefaultWorkFile(context, ".avro")
+        path.getFileSystem(context.getConfiguration).create(path)
+      }
+
+    }.getRecordWriter(context)
+
+  override def write(internalRow: InternalRow): Unit = {
+    val row = internalRowConverter(internalRow)
+    val key = new AvroKey(converter(row).asInstanceOf[GenericRecord])
+    recordWriter.write(key, NullWritable.get())
+  }
+
+  override def close(): Unit = recordWriter.close(context)
+
+  /**
+   * This function constructs converter function for a given sparkSQL datatype. This is used in
+   * writing Avro records out to disk
+   */
+  private def createConverterToAvro(
+      dataType: DataType,
+      structName: String,
+      recordNamespace: String): (Any) => Any = {
+    dataType match {
+      case BinaryType => (item: Any) => item match {
+        case null => null
+        case bytes: Array[Byte] => ByteBuffer.wrap(bytes)
+      }
+      case ByteType | ShortType | IntegerType | LongType |
+           FloatType | DoubleType | StringType | BooleanType => identity
+      case _: DecimalType => (item: Any) => if (item == null) null else item.toString
+      case TimestampType => (item: Any) =>
+        if (item == null) null else item.asInstanceOf[Timestamp].getTime
+      case DateType => (item: Any) =>
+        if (item == null) null else item.asInstanceOf[Date].getTime
+      case ArrayType(elementType, _) =>
+        val elementConverter = createConverterToAvro(
+          elementType,
+          structName,
+          SchemaConverters.getNewRecordNamespace(elementType, recordNamespace, structName))
+        (item: Any) => {
+          if (item == null) {
+            null
+          } else {
+            val sourceArray = item.asInstanceOf[Seq[Any]]
+            val sourceArraySize = sourceArray.size
+            val targetArray = new Array[Any](sourceArraySize)
+            var idx = 0
+            while (idx < sourceArraySize) {
+              targetArray(idx) = elementConverter(sourceArray(idx))
+              idx += 1
+            }
+            targetArray
+          }
+        }
+      case MapType(StringType, valueType, _) =>
+        val valueConverter = createConverterToAvro(
+          valueType,
+          structName,
+          SchemaConverters.getNewRecordNamespace(valueType, recordNamespace, structName))
+        (item: Any) => {
+          if (item == null) {
+            null
+          } else {
+            val javaMap = new HashMap[String, Any]()
+            item.asInstanceOf[Map[String, Any]].foreach { case (key, value) =>
+              javaMap.put(key, valueConverter(value))
+            }
+            javaMap
+          }
+        }
+      case structType: StructType =>
+        val builder = SchemaBuilder.record(structName).namespace(recordNamespace)
+        val schema: Schema = SchemaConverters.convertStructToAvro(
+          structType, builder, recordNamespace)
+        val fieldConverters = structType.fields.map(field =>
+          createConverterToAvro(
+            field.dataType,
+            field.name,
+            SchemaConverters.getNewRecordNamespace(field.dataType, recordNamespace, field.name)))
+        (item: Any) => {
+          if (item == null) {
+            null
+          } else {
+            val record = new Record(schema)
+            val convertersIterator = fieldConverters.iterator
+            val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator
+            val rowIterator = item.asInstanceOf[Row].toSeq.iterator
+
+            while (convertersIterator.hasNext) {
+              val converter = convertersIterator.next()
+              record.put(fieldNamesIterator.next(), converter(rowIterator.next()))
+            }
+            record
+          }
+        }
+    }
+  }
+}
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
new file mode 100644 (file)
index 0000000..5b2ce7d
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.types.StructType
+
+private[avro] class AvroOutputWriterFactory(
+    schema: StructType,
+    recordName: String,
+    recordNamespace: String) extends OutputWriterFactory {
+
+  override def getFileExtension(context: TaskAttemptContext): String = ".avro"
+
+  override def newInstance(
+      path: String,
+      dataSchema: StructType,
+      context: TaskAttemptContext): OutputWriter = {
+    new AvroOutputWriter(path, context, schema, recordName, recordNamespace)
+  }
+}
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
new file mode 100644 (file)
index 0000000..01f8c74
--- /dev/null
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import java.nio.ByteBuffer
+import java.sql.{Date, Timestamp}
+
+import scala.collection.JavaConverters._
+
+import org.apache.avro.{Schema, SchemaBuilder}
+import org.apache.avro.Schema.Type._
+import org.apache.avro.SchemaBuilder._
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.generic.GenericFixed
+
+import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.types._
+
+/**
+ * This object contains method that are used to convert sparkSQL schemas to avro schemas and vice
+ * versa.
+ */
+object SchemaConverters {
+
+  class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex)
+
+  case class SchemaType(dataType: DataType, nullable: Boolean)
+
+  /**
+   * This function takes an avro schema and returns a sql schema.
+   */
+  def toSqlType(avroSchema: Schema): SchemaType = {
+    avroSchema.getType match {
+      case INT => SchemaType(IntegerType, nullable = false)
+      case STRING => SchemaType(StringType, nullable = false)
+      case BOOLEAN => SchemaType(BooleanType, nullable = false)
+      case BYTES => SchemaType(BinaryType, nullable = false)
+      case DOUBLE => SchemaType(DoubleType, nullable = false)
+      case FLOAT => SchemaType(FloatType, nullable = false)
+      case LONG => SchemaType(LongType, nullable = false)
+      case FIXED => SchemaType(BinaryType, nullable = false)
+      case ENUM => SchemaType(StringType, nullable = false)
+
+      case RECORD =>
+        val fields = avroSchema.getFields.asScala.map { f =>
+          val schemaType = toSqlType(f.schema())
+          StructField(f.name, schemaType.dataType, schemaType.nullable)
+        }
+
+        SchemaType(StructType(fields), nullable = false)
+
+      case ARRAY =>
+        val schemaType = toSqlType(avroSchema.getElementType)
+        SchemaType(
+          ArrayType(schemaType.dataType, containsNull = schemaType.nullable),
+          nullable = false)
+
+      case MAP =>
+        val schemaType = toSqlType(avroSchema.getValueType)
+        SchemaType(
+          MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable),
+          nullable = false)
+
+      case UNION =>
+        if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
+          // In case of a union with null, eliminate it and make a recursive call
+          val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL)
+          if (remainingUnionTypes.size == 1) {
+            toSqlType(remainingUnionTypes.head).copy(nullable = true)
+          } else {
+            toSqlType(Schema.createUnion(remainingUnionTypes.asJava)).copy(nullable = true)
+          }
+        } else avroSchema.getTypes.asScala.map(_.getType) match {
+          case Seq(t1) =>
+            toSqlType(avroSchema.getTypes.get(0))
+          case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
+            SchemaType(LongType, nullable = false)
+          case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
+            SchemaType(DoubleType, nullable = false)
+          case _ =>
+            // Convert complex unions to struct types where field names are member0, member1, etc.
+            // This is consistent with the behavior when converting between Avro and Parquet.
+            val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
+              case (s, i) =>
+                val schemaType = toSqlType(s)
+                // All fields are nullable because only one of them is set at a time
+                StructField(s"member$i", schemaType.dataType, nullable = true)
+            }
+
+            SchemaType(StructType(fields), nullable = false)
+        }
+
+      case other => throw new IncompatibleSchemaException(s"Unsupported type $other")
+    }
+  }
+
+  /**
+   * This function converts sparkSQL StructType into avro schema. This method uses two other
+   * converter methods in order to do the conversion.
+   */
+  def convertStructToAvro[T](
+      structType: StructType,
+      schemaBuilder: RecordBuilder[T],
+      recordNamespace: String): T = {
+    val fieldsAssembler: FieldAssembler[T] = schemaBuilder.fields()
+    structType.fields.foreach { field =>
+      val newField = fieldsAssembler.name(field.name).`type`()
+
+      if (field.nullable) {
+        convertFieldTypeToAvro(field.dataType, newField.nullable(), field.name, recordNamespace)
+          .noDefault
+      } else {
+        convertFieldTypeToAvro(field.dataType, newField, field.name, recordNamespace)
+          .noDefault
+      }
+    }
+    fieldsAssembler.endRecord()
+  }
+
+  /**
+   * Returns a converter function to convert row in avro format to GenericRow of catalyst.
+   *
+   * @param sourceAvroSchema Source schema before conversion inferred from avro file by passed in
+   *                       by user.
+   * @param targetSqlType Target catalyst sql type after the conversion.
+   * @return returns a converter function to convert row in avro format to GenericRow of catalyst.
+   */
+  private[avro] def createConverterToSQL(
+    sourceAvroSchema: Schema,
+    targetSqlType: DataType): AnyRef => AnyRef = {
+
+    def createConverter(avroSchema: Schema,
+        sqlType: DataType, path: List[String]): AnyRef => AnyRef = {
+      val avroType = avroSchema.getType
+      (sqlType, avroType) match {
+        // Avro strings are in Utf8, so we have to call toString on them
+        case (StringType, STRING) | (StringType, ENUM) =>
+          (item: AnyRef) => item.toString
+        // Byte arrays are reused by avro, so we have to make a copy of them.
+        case (IntegerType, INT) | (BooleanType, BOOLEAN) | (DoubleType, DOUBLE) |
+             (FloatType, FLOAT) | (LongType, LONG) =>
+          identity
+        case (TimestampType, LONG) =>
+          (item: AnyRef) => new Timestamp(item.asInstanceOf[Long])
+        case (DateType, LONG) =>
+          (item: AnyRef) => new Date(item.asInstanceOf[Long])
+        case (BinaryType, FIXED) =>
+          (item: AnyRef) => item.asInstanceOf[GenericFixed].bytes().clone()
+        case (BinaryType, BYTES) =>
+          (item: AnyRef) =>
+            val byteBuffer = item.asInstanceOf[ByteBuffer]
+            val bytes = new Array[Byte](byteBuffer.remaining)
+            byteBuffer.get(bytes)
+            bytes
+        case (struct: StructType, RECORD) =>
+          val length = struct.fields.length
+          val converters = new Array[AnyRef => AnyRef](length)
+          val avroFieldIndexes = new Array[Int](length)
+          var i = 0
+          while (i < length) {
+            val sqlField = struct.fields(i)
+            val avroField = avroSchema.getField(sqlField.name)
+            if (avroField != null) {
+              val converter = (item: AnyRef) => {
+                if (item == null) {
+                  item
+                } else {
+                  createConverter(avroField.schema, sqlField.dataType, path :+ sqlField.name)(item)
+                }
+              }
+              converters(i) = converter
+              avroFieldIndexes(i) = avroField.pos()
+            } else if (!sqlField.nullable) {
+              throw new IncompatibleSchemaException(
+                s"Cannot find non-nullable field ${sqlField.name} at path ${path.mkString(".")} " +
+                  "in Avro schema\n" +
+                  s"Source Avro schema: $sourceAvroSchema.\n" +
+                  s"Target Catalyst type: $targetSqlType")
+            }
+            i += 1
+          }
+
+          (item: AnyRef) =>
+            val record = item.asInstanceOf[GenericRecord]
+            val result = new Array[Any](length)
+            var i = 0
+            while (i < converters.length) {
+              if (converters(i) != null) {
+                val converter = converters(i)
+                result(i) = converter(record.get(avroFieldIndexes(i)))
+              }
+              i += 1
+            }
+            new GenericRow(result)
+        case (arrayType: ArrayType, ARRAY) =>
+          val elementConverter = createConverter(avroSchema.getElementType, arrayType.elementType,
+            path)
+          val allowsNull = arrayType.containsNull
+          (item: AnyRef) =>
+            item.asInstanceOf[java.lang.Iterable[AnyRef]].asScala.map { element =>
+              if (element == null && !allowsNull) {
+                throw new RuntimeException(s"Array value at path ${path.mkString(".")} is not " +
+                  "allowed to be null")
+              } else {
+                elementConverter(element)
+              }
+            }
+        case (mapType: MapType, MAP) if mapType.keyType == StringType =>
+          val valueConverter = createConverter(avroSchema.getValueType, mapType.valueType, path)
+          val allowsNull = mapType.valueContainsNull
+          (item: AnyRef) =>
+            item.asInstanceOf[java.util.Map[AnyRef, AnyRef]].asScala.map { case (k, v) =>
+              if (v == null && !allowsNull) {
+                throw new RuntimeException(s"Map value at path ${path.mkString(".")} is not " +
+                  "allowed to be null")
+              } else {
+                (k.toString, valueConverter(v))
+              }
+            }.toMap
+        case (sqlType, UNION) =>
+          if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
+            val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL)
+            if (remainingUnionTypes.size == 1) {
+              createConverter(remainingUnionTypes.head, sqlType, path)
+            } else {
+              createConverter(Schema.createUnion(remainingUnionTypes.asJava), sqlType, path)
+            }
+          } else avroSchema.getTypes.asScala.map(_.getType) match {
+            case Seq(t1) => createConverter(avroSchema.getTypes.get(0), sqlType, path)
+            case Seq(a, b) if Set(a, b) == Set(INT, LONG) && sqlType == LongType =>
+              (item: AnyRef) =>
+                item match {
+                  case l: java.lang.Long => l
+                  case i: java.lang.Integer => new java.lang.Long(i.longValue())
+                }
+            case Seq(a, b) if Set(a, b) == Set(FLOAT, DOUBLE) && sqlType == DoubleType =>
+              (item: AnyRef) =>
+                item match {
+                  case d: java.lang.Double => d
+                  case f: java.lang.Float => new java.lang.Double(f.doubleValue())
+                }
+            case other =>
+              sqlType match {
+                case t: StructType if t.fields.length == avroSchema.getTypes.size =>
+                  val fieldConverters = t.fields.zip(avroSchema.getTypes.asScala).map {
+                    case (field, schema) =>
+                      createConverter(schema, field.dataType, path :+ field.name)
+                  }
+                  (item: AnyRef) =>
+                    val i = GenericData.get().resolveUnion(avroSchema, item)
+                    val converted = new Array[Any](fieldConverters.length)
+                    converted(i) = fieldConverters(i)(item)
+                    new GenericRow(converted)
+                case _ => throw new IncompatibleSchemaException(
+                  s"Cannot convert Avro schema to catalyst type because schema at path " +
+                    s"${path.mkString(".")} is not compatible " +
+                    s"(avroType = $other, sqlType = $sqlType). \n" +
+                    s"Source Avro schema: $sourceAvroSchema.\n" +
+                    s"Target Catalyst type: $targetSqlType")
+              }
+          }
+        case (left, right) =>
+          throw new IncompatibleSchemaException(
+            s"Cannot convert Avro schema to catalyst type because schema at path " +
+              s"${path.mkString(".")} is not compatible (avroType = $right, sqlType = $left). \n" +
+              s"Source Avro schema: $sourceAvroSchema.\n" +
+              s"Target Catalyst type: $targetSqlType")
+      }
+    }
+    createConverter(sourceAvroSchema, targetSqlType, List.empty[String])
+  }
+
+  /**
+   * This function is used to convert some sparkSQL type to avro type. Note that this function won't
+   * be used to construct fields of avro record (convertFieldTypeToAvro is used for that).
+   */
+  private def convertTypeToAvro[T](
+      dataType: DataType,
+      schemaBuilder: BaseTypeBuilder[T],
+      structName: String,
+      recordNamespace: String): T = {
+    dataType match {
+      case ByteType => schemaBuilder.intType()
+      case ShortType => schemaBuilder.intType()
+      case IntegerType => schemaBuilder.intType()
+      case LongType => schemaBuilder.longType()
+      case FloatType => schemaBuilder.floatType()
+      case DoubleType => schemaBuilder.doubleType()
+      case _: DecimalType => schemaBuilder.stringType()
+      case StringType => schemaBuilder.stringType()
+      case BinaryType => schemaBuilder.bytesType()
+      case BooleanType => schemaBuilder.booleanType()
+      case TimestampType => schemaBuilder.longType()
+      case DateType => schemaBuilder.longType()
+
+      case ArrayType(elementType, _) =>
+        val builder = getSchemaBuilder(dataType.asInstanceOf[ArrayType].containsNull)
+        val elementSchema = convertTypeToAvro(elementType, builder, structName, recordNamespace)
+        schemaBuilder.array().items(elementSchema)
+
+      case MapType(StringType, valueType, _) =>
+        val builder = getSchemaBuilder(dataType.asInstanceOf[MapType].valueContainsNull)
+        val valueSchema = convertTypeToAvro(valueType, builder, structName, recordNamespace)
+        schemaBuilder.map().values(valueSchema)
+
+      case structType: StructType =>
+        convertStructToAvro(
+          structType,
+          schemaBuilder.record(structName).namespace(recordNamespace),
+          recordNamespace)
+
+      case other => throw new IncompatibleSchemaException(s"Unexpected type $dataType.")
+    }
+  }
+
+  /**
+   * This function is used to construct fields of the avro record, where schema of the field is
+   * specified by avro representation of dataType. Since builders for record fields are different
+   * from those for everything else, we have to use a separate method.
+   */
+  private def convertFieldTypeToAvro[T](
+      dataType: DataType,
+      newFieldBuilder: BaseFieldTypeBuilder[T],
+      structName: String,
+      recordNamespace: String): FieldDefault[T, _] = {
+    dataType match {
+      case ByteType => newFieldBuilder.intType()
+      case ShortType => newFieldBuilder.intType()
+      case IntegerType => newFieldBuilder.intType()
+      case LongType => newFieldBuilder.longType()
+      case FloatType => newFieldBuilder.floatType()
+      case DoubleType => newFieldBuilder.doubleType()
+      case _: DecimalType => newFieldBuilder.stringType()
+      case StringType => newFieldBuilder.stringType()
+      case BinaryType => newFieldBuilder.bytesType()
+      case BooleanType => newFieldBuilder.booleanType()
+      case TimestampType => newFieldBuilder.longType()
+      case DateType => newFieldBuilder.longType()
+
+      case ArrayType(elementType, _) =>
+        val builder = getSchemaBuilder(dataType.asInstanceOf[ArrayType].containsNull)
+        val elementSchema = convertTypeToAvro(
+          elementType,
+          builder,
+          structName,
+          getNewRecordNamespace(elementType, recordNamespace, structName))
+        newFieldBuilder.array().items(elementSchema)
+
+      case MapType(StringType, valueType, _) =>
+        val builder = getSchemaBuilder(dataType.asInstanceOf[MapType].valueContainsNull)
+        val valueSchema = convertTypeToAvro(
+          valueType,
+          builder,
+          structName,
+          getNewRecordNamespace(valueType, recordNamespace, structName))
+        newFieldBuilder.map().values(valueSchema)
+
+      case structType: StructType =>
+        convertStructToAvro(
+          structType,
+          newFieldBuilder.record(structName).namespace(s"$recordNamespace.$structName"),
+          s"$recordNamespace.$structName")
+
+      case other => throw new IncompatibleSchemaException(s"Unexpected type $dataType.")
+    }
+  }
+
+  /**
+   * Returns a new namespace depending on the data type of the element.
+   * If the data type is a StructType it returns the current namespace concatenated
+   * with the element name, otherwise it returns the current namespace as it is.
+   */
+  private[avro] def getNewRecordNamespace(
+      elementDataType: DataType,
+      currentRecordNamespace: String,
+      elementName: String): String = {
+
+    elementDataType match {
+      case StructType(_) => s"$currentRecordNamespace.$elementName"
+      case _ => currentRecordNamespace
+    }
+  }
+
+  private def getSchemaBuilder(isNullable: Boolean): BaseTypeBuilder[Schema] = {
+    if (isNullable) {
+      SchemaBuilder.builder().nullable()
+    } else {
+      SchemaBuilder.builder()
+    }
+  }
+}
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
new file mode 100755 (executable)
index 0000000..b3c8a66
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+package object avro {
+  /**
+   * Adds a method, `avro`, to DataFrameWriter that allows you to write avro files using
+   * the DataFileWriter
+   */
+  implicit class AvroDataFrameWriter[T](writer: DataFrameWriter[T]) {
+    def avro: String => Unit = writer.format("avro").save
+  }
+
+  /**
+   * Adds a method, `avro`, to DataFrameReader that allows you to read avro files using
+   * the DataFileReader
+   */
+  implicit class AvroDataFrameReader(reader: DataFrameReader) {
+    def avro: String => DataFrame = reader.format("avro").load
+
+    @scala.annotation.varargs
+    def avro(sources: String*): DataFrame = reader.format("avro").load(sources: _*)
+  }
+}
diff --git a/external/avro/src/test/resources/episodes.avro b/external/avro/src/test/resources/episodes.avro
new file mode 100644 (file)
index 0000000..58a028c
Binary files /dev/null and b/external/avro/src/test/resources/episodes.avro differ
diff --git a/external/avro/src/test/resources/log4j.properties b/external/avro/src/test/resources/log4j.properties
new file mode 100644 (file)
index 0000000..c18a724
--- /dev/null
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file core/target/unit-tests.log
+log4j.rootLogger=DEBUG, CA, FA
+
+#Console Appender
+log4j.appender.CA=org.apache.log4j.ConsoleAppender
+log4j.appender.CA.layout=org.apache.log4j.PatternLayout
+log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
+log4j.appender.CA.Threshold = WARN
+
+
+#File Appender
+log4j.appender.FA=org.apache.log4j.FileAppender
+log4j.appender.FA.append=false
+log4j.appender.FA.file=target/unit-tests.log
+log4j.appender.FA.layout=org.apache.log4j.PatternLayout
+log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Set the logger level of File Appender to WARN
+log4j.appender.FA.Threshold = INFO
+
+# Some packages are noisy for no good reason.
+log4j.additivity.parquet.hadoop.ParquetRecordReader=false
+log4j.logger.parquet.hadoop.ParquetRecordReader=OFF
+
+log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false
+log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF
+
+log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false
+log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF
+
+log4j.additivity.hive.ql.metadata.Hive=false
+log4j.logger.hive.ql.metadata.Hive=OFF
\ No newline at end of file
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00000.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00000.avro
new file mode 100755 (executable)
index 0000000..fece892
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00000.avro differ
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00001.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00001.avro
new file mode 100755 (executable)
index 0000000..1ca623a
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00001.avro differ
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00002.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00002.avro
new file mode 100755 (executable)
index 0000000..a12e945
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00002.avro differ
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00003.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00003.avro
new file mode 100755 (executable)
index 0000000..60c0956
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00003.avro differ
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00004.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00004.avro
new file mode 100755 (executable)
index 0000000..af56dfc
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00004.avro differ
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00005.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00005.avro
new file mode 100755 (executable)
index 0000000..87d7844
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00005.avro differ
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00006.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00006.avro
new file mode 100755 (executable)
index 0000000..c326fc4
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00006.avro differ
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00007.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00007.avro
new file mode 100755 (executable)
index 0000000..279f36c
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00007.avro differ
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00008.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00008.avro
new file mode 100755 (executable)
index 0000000..8d70f5d
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00008.avro differ
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00009.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00009.avro
new file mode 100755 (executable)
index 0000000..6839d72
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00009.avro differ
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00010.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00010.avro
new file mode 100755 (executable)
index 0000000..aedc7f7
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00010.avro differ
diff --git a/external/avro/src/test/resources/test.avro b/external/avro/src/test/resources/test.avro
new file mode 100644 (file)
index 0000000..6425e21
Binary files /dev/null and b/external/avro/src/test/resources/test.avro differ
diff --git a/external/avro/src/test/resources/test.avsc b/external/avro/src/test/resources/test.avsc
new file mode 100644 (file)
index 0000000..d7119a0
--- /dev/null
@@ -0,0 +1,53 @@
+{
+  "type" : "record",
+  "name" : "test_schema",
+  "fields" : [{
+    "name" : "string",
+    "type" : "string",
+    "doc"  : "Meaningless string of characters"
+  }, {
+    "name" : "simple_map",
+    "type" : {"type": "map", "values": "int"}
+  }, {
+    "name" : "complex_map",
+    "type" : {"type": "map", "values": {"type": "map", "values": "string"}}
+  }, {
+    "name" : "union_string_null",
+    "type" : ["null", "string"]
+  }, {
+    "name" : "union_int_long_null",
+    "type" : ["int", "long", "null"]
+  }, {
+    "name" : "union_float_double",
+    "type" : ["float", "double"]
+  }, {
+    "name": "fixed3",
+    "type": {"type": "fixed", "size": 3, "name": "fixed3"}
+  }, {
+    "name": "fixed2",
+    "type": {"type": "fixed", "size": 2, "name": "fixed2"}
+  }, {
+    "name": "enum",
+    "type": { "type": "enum",
+              "name": "Suit",
+              "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
+            }
+  }, {
+    "name": "record",
+    "type": {
+      "type": "record", 
+      "name": "record",
+      "aliases": ["RecordAlias"],
+      "fields" : [{
+        "name": "value_field",
+        "type": "string"
+      }]
+    }
+  }, {
+    "name": "array_of_boolean",
+    "type": {"type": "array", "items": "boolean"}
+  }, {
+    "name": "bytes",
+    "type": "bytes"
+  }]
+}
diff --git a/external/avro/src/test/resources/test.json b/external/avro/src/test/resources/test.json
new file mode 100644 (file)
index 0000000..780189a
--- /dev/null
@@ -0,0 +1,42 @@
+{
+       "string": "OMG SPARK IS AWESOME",
+       "simple_map": {"abc": 1, "bcd": 7},
+       "complex_map": {"key": {"a": "b", "c": "d"}},
+       "union_string_null": {"string": "abc"},
+       "union_int_long_null": {"int": 1},
+       "union_float_double": {"float": 3.1415926535},
+       "fixed3":"\u0002\u0003\u0004",
+       "fixed2":"\u0011\u0012",
+       "enum": "SPADES",
+       "record": {"value_field": "Two things are infinite: the universe and human stupidity; and I'm not sure about universe."},
+       "array_of_boolean": [true, false, false],
+       "bytes": "\u0041\u0042\u0043"
+}
+{
+       "string": "Terran is IMBA!",
+       "simple_map": {"mmm": 0, "qqq": 66},
+       "complex_map": {"key": {"1": "2", "3": "4"}},
+       "union_string_null": {"string": "123"},
+       "union_int_long_null": {"long": 66},
+       "union_float_double": {"double": 6.6666666666666},
+       "fixed3":"\u0007\u0007\u0007",
+       "fixed2":"\u0001\u0002",
+       "enum": "CLUBS",
+       "record": {"value_field": "Life did not intend to make us perfect. Whoever is perfect belongs in a museum."},
+       "array_of_boolean": [],
+       "bytes": ""
+}
+{
+       "string": "The cake is a LIE!",
+       "simple_map": {},
+       "complex_map": {"key": {}},
+       "union_string_null": {"null": null},
+       "union_int_long_null": {"null": null},
+       "union_float_double": {"double": 0},
+       "fixed3":"\u0011\u0022\u0009",
+       "fixed2":"\u0010\u0090",
+       "enum": "DIAMONDS",
+       "record": {"value_field": "TEST_STR123"},
+       "array_of_boolean": [false],
+       "bytes": "\u0053"
+}
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
new file mode 100644 (file)
index 0000000..c6c1e40
--- /dev/null
@@ -0,0 +1,812 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import java.io._
+import java.nio.file.Files
+import java.sql.{Date, Timestamp}
+import java.util.{TimeZone, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.avro.Schema
+import org.apache.avro.Schema.{Field, Type}
+import org.apache.avro.file.DataFileWriter
+import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
+import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql._
+import org.apache.spark.sql.avro.SchemaConverters.IncompatibleSchemaException
+import org.apache.spark.sql.types._
+
+class AvroSuite extends SparkFunSuite {
+  val episodesFile = "src/test/resources/episodes.avro"
+  val testFile = "src/test/resources/test.avro"
+
+  private var spark: SparkSession = _
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    spark = SparkSession.builder()
+      .master("local[2]")
+      .appName("AvroSuite")
+      .config("spark.sql.files.maxPartitionBytes", 1024)
+      .getOrCreate()
+  }
+
+  override protected def afterAll(): Unit = {
+    try {
+      spark.sparkContext.stop()
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  test("reading from multiple paths") {
+    val df = spark.read.avro(episodesFile, episodesFile)
+    assert(df.count == 16)
+  }
+
+  test("reading and writing partitioned data") {
+    val df = spark.read.avro(episodesFile)
+    val fields = List("title", "air_date", "doctor")
+    for (field <- fields) {
+      TestUtils.withTempDir { dir =>
+        val outputDir = s"$dir/${UUID.randomUUID}"
+        df.write.partitionBy(field).avro(outputDir)
+        val input = spark.read.avro(outputDir)
+        // makes sure that no fields got dropped.
+        // We convert Rows to Seqs in order to work around SPARK-10325
+        assert(input.select(field).collect().map(_.toSeq).toSet ===
+          df.select(field).collect().map(_.toSeq).toSet)
+      }
+    }
+  }
+
+  test("request no fields") {
+    val df = spark.read.avro(episodesFile)
+    df.registerTempTable("avro_table")
+    assert(spark.sql("select count(*) from avro_table").collect().head === Row(8))
+  }
+
+  test("convert formats") {
+    TestUtils.withTempDir { dir =>
+      val df = spark.read.avro(episodesFile)
+      df.write.parquet(dir.getCanonicalPath)
+      assert(spark.read.parquet(dir.getCanonicalPath).count() === df.count)
+    }
+  }
+
+  test("rearrange internal schema") {
+    TestUtils.withTempDir { dir =>
+      val df = spark.read.avro(episodesFile)
+      df.select("doctor", "title").write.avro(dir.getCanonicalPath)
+    }
+  }
+
+  test("test NULL avro type") {
+    TestUtils.withTempDir { dir =>
+      val fields = Seq(new Field("null", Schema.create(Type.NULL), "doc", null)).asJava
+      val schema = Schema.createRecord("name", "docs", "namespace", false)
+      schema.setFields(fields)
+      val datumWriter = new GenericDatumWriter[GenericRecord](schema)
+      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
+      dataFileWriter.create(schema, new File(s"$dir.avro"))
+      val avroRec = new GenericData.Record(schema)
+      avroRec.put("null", null)
+      dataFileWriter.append(avroRec)
+      dataFileWriter.flush()
+      dataFileWriter.close()
+
+      intercept[IncompatibleSchemaException] {
+        spark.read.avro(s"$dir.avro")
+      }
+    }
+  }
+
+  test("union(int, long) is read as long") {
+    TestUtils.withTempDir { dir =>
+      val avroSchema: Schema = {
+        val union =
+          Schema.createUnion(List(Schema.create(Type.INT), Schema.create(Type.LONG)).asJava)
+        val fields = Seq(new Field("field1", union, "doc", null)).asJava
+        val schema = Schema.createRecord("name", "docs", "namespace", false)
+        schema.setFields(fields)
+        schema
+      }
+
+      val datumWriter = new GenericDatumWriter[GenericRecord](avroSchema)
+      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
+      dataFileWriter.create(avroSchema, new File(s"$dir.avro"))
+      val rec1 = new GenericData.Record(avroSchema)
+      rec1.put("field1", 1.toLong)
+      dataFileWriter.append(rec1)
+      val rec2 = new GenericData.Record(avroSchema)
+      rec2.put("field1", 2)
+      dataFileWriter.append(rec2)
+      dataFileWriter.flush()
+      dataFileWriter.close()
+      val df = spark.read.avro(s"$dir.avro")
+      assert(df.schema.fields === Seq(StructField("field1", LongType, nullable = true)))
+      assert(df.collect().toSet == Set(Row(1L), Row(2L)))
+    }
+  }
+
+  test("union(float, double) is read as double") {
+    TestUtils.withTempDir { dir =>
+      val avroSchema: Schema = {
+        val union =
+          Schema.createUnion(List(Schema.create(Type.FLOAT), Schema.create(Type.DOUBLE)).asJava)
+        val fields = Seq(new Field("field1", union, "doc", null)).asJava
+        val schema = Schema.createRecord("name", "docs", "namespace", false)
+        schema.setFields(fields)
+        schema
+      }
+
+      val datumWriter = new GenericDatumWriter[GenericRecord](avroSchema)
+      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
+      dataFileWriter.create(avroSchema, new File(s"$dir.avro"))
+      val rec1 = new GenericData.Record(avroSchema)
+      rec1.put("field1", 1.toFloat)
+      dataFileWriter.append(rec1)
+      val rec2 = new GenericData.Record(avroSchema)
+      rec2.put("field1", 2.toDouble)
+      dataFileWriter.append(rec2)
+      dataFileWriter.flush()
+      dataFileWriter.close()
+      val df = spark.read.avro(s"$dir.avro")
+      assert(df.schema.fields === Seq(StructField("field1", DoubleType, nullable = true)))
+      assert(df.collect().toSet == Set(Row(1.toDouble), Row(2.toDouble)))
+    }
+  }
+
+  test("union(float, double, null) is read as nullable double") {
+    TestUtils.withTempDir { dir =>
+      val avroSchema: Schema = {
+        val union = Schema.createUnion(
+          List(Schema.create(Type.FLOAT),
+            Schema.create(Type.DOUBLE),
+            Schema.create(Type.NULL)
+          ).asJava
+        )
+        val fields = Seq(new Field("field1", union, "doc", null)).asJava
+        val schema = Schema.createRecord("name", "docs", "namespace", false)
+        schema.setFields(fields)
+        schema
+      }
+
+      val datumWriter = new GenericDatumWriter[GenericRecord](avroSchema)
+      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
+      dataFileWriter.create(avroSchema, new File(s"$dir.avro"))
+      val rec1 = new GenericData.Record(avroSchema)
+      rec1.put("field1", 1.toFloat)
+      dataFileWriter.append(rec1)
+      val rec2 = new GenericData.Record(avroSchema)
+      rec2.put("field1", null)
+      dataFileWriter.append(rec2)
+      dataFileWriter.flush()
+      dataFileWriter.close()
+      val df = spark.read.avro(s"$dir.avro")
+      assert(df.schema.fields === Seq(StructField("field1", DoubleType, nullable = true)))
+      assert(df.collect().toSet == Set(Row(1.toDouble), Row(null)))
+    }
+  }
+
+  test("Union of a single type") {
+    TestUtils.withTempDir { dir =>
+      val UnionOfOne = Schema.createUnion(List(Schema.create(Type.INT)).asJava)
+      val fields = Seq(new Field("field1", UnionOfOne, "doc", null)).asJava
+      val schema = Schema.createRecord("name", "docs", "namespace", false)
+      schema.setFields(fields)
+
+      val datumWriter = new GenericDatumWriter[GenericRecord](schema)
+      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
+      dataFileWriter.create(schema, new File(s"$dir.avro"))
+      val avroRec = new GenericData.Record(schema)
+
+      avroRec.put("field1", 8)
+
+      dataFileWriter.append(avroRec)
+      dataFileWriter.flush()
+      dataFileWriter.close()
+
+      val df = spark.read.avro(s"$dir.avro")
+      assert(df.first() == Row(8))
+    }
+  }
+
+  test("Complex Union Type") {
+    TestUtils.withTempDir { dir =>
+      val fixedSchema = Schema.createFixed("fixed_name", "doc", "namespace", 4)
+      val enumSchema = Schema.createEnum("enum_name", "doc", "namespace", List("e1", "e2").asJava)
+      val complexUnionType = Schema.createUnion(
+        List(Schema.create(Type.INT), Schema.create(Type.STRING), fixedSchema, enumSchema).asJava)
+      val fields = Seq(
+        new Field("field1", complexUnionType, "doc", null),
+        new Field("field2", complexUnionType, "doc", null),
+        new Field("field3", complexUnionType, "doc", null),
+        new Field("field4", complexUnionType, "doc", null)
+      ).asJava
+      val schema = Schema.createRecord("name", "docs", "namespace", false)
+      schema.setFields(fields)
+      val datumWriter = new GenericDatumWriter[GenericRecord](schema)
+      val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
+      dataFileWriter.create(schema, new File(s"$dir.avro"))
+      val avroRec = new GenericData.Record(schema)
+      val field1 = 1234
+      val field2 = "Hope that was not load bearing"
+      val field3 = Array[Byte](1, 2, 3, 4)
+      val field4 = "e2"
+      avroRec.put("field1", field1)
+      avroRec.put("field2", field2)
+      avroRec.put("field3", new Fixed(fixedSchema, field3))
+      avroRec.put("field4", new EnumSymbol(enumSchema, field4))
+      dataFileWriter.append(avroRec)
+      dataFileWriter.flush()
+      dataFileWriter.close()
+
+      val df = spark.sqlContext.read.avro(s"$dir.avro")
+      assertResult(field1)(df.selectExpr("field1.member0").first().get(0))
+      assertResult(field2)(df.selectExpr("field2.member1").first().get(0))
+      assertResult(field3)(df.selectExpr("field3.member2").first().get(0))
+      assertResult(field4)(df.selectExpr("field4.member3").first().get(0))
+    }
+  }
+
+  test("Lots of nulls") {
+    TestUtils.withTempDir { dir =>
+      val schema = StructType(Seq(
+        StructField("binary", BinaryType, true),
+        StructField("timestamp", TimestampType, true),
+        StructField("array", ArrayType(ShortType), true),
+        StructField("map", MapType(StringType, StringType), true),
+        StructField("struct", StructType(Seq(StructField("int", IntegerType, true))))))
+      val rdd = spark.sparkContext.parallelize(Seq[Row](
+        Row(null, new Timestamp(1), Array[Short](1, 2, 3), null, null),
+        Row(null, null, null, null, null),
+        Row(null, null, null, null, null),
+        Row(null, null, null, null, null)))
+      val df = spark.createDataFrame(rdd, schema)
+      df.write.avro(dir.toString)
+      assert(spark.read.avro(dir.toString).count == rdd.count)
+    }
+  }
+
+  test("Struct field type") {
+    TestUtils.withTempDir { dir =>
+      val schema = StructType(Seq(
+        StructField("float", FloatType, true),
+        StructField("short", ShortType, true),
+        StructField("byte", ByteType, true),
+        StructField("boolean", BooleanType, true)
+      ))
+      val rdd = spark.sparkContext.parallelize(Seq(
+        Row(1f, 1.toShort, 1.toByte, true),
+        Row(2f, 2.toShort, 2.toByte, true),
+        Row(3f, 3.toShort, 3.toByte, true)
+      ))
+      val df = spark.createDataFrame(rdd, schema)
+      df.write.avro(dir.toString)
+      assert(spark.read.avro(dir.toString).count == rdd.count)
+    }
+  }
+
+  test("Date field type") {
+    TestUtils.withTempDir { dir =>
+      val schema = StructType(Seq(
+        StructField("float", FloatType, true),
+        StructField("date", DateType, true)
+      ))
+      TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
+      val rdd = spark.sparkContext.parallelize(Seq(
+        Row(1f, null),
+        Row(2f, new Date(1451948400000L)),
+        Row(3f, new Date(1460066400500L))
+      ))
+      val df = spark.createDataFrame(rdd, schema)
+      df.write.avro(dir.toString)
+      assert(spark.read.avro(dir.toString).count == rdd.count)
+      assert(spark.read.avro(dir.toString).select("date").collect().map(_(0)).toSet ==
+        Array(null, 1451865600000L, 1459987200000L).toSet)
+    }
+  }
+
+  test("Array data types") {
+    TestUtils.withTempDir { dir =>
+      val testSchema = StructType(Seq(
+        StructField("byte_array", ArrayType(ByteType), true),
+        StructField("short_array", ArrayType(ShortType), true),
+        StructField("float_array", ArrayType(FloatType), true),
+        StructField("bool_array", ArrayType(BooleanType), true),
+        StructField("long_array", ArrayType(LongType), true),
+        StructField("double_array", ArrayType(DoubleType), true),
+        StructField("decimal_array", ArrayType(DecimalType(10, 0)), true),
+        StructField("bin_array", ArrayType(BinaryType), true),
+        StructField("timestamp_array", ArrayType(TimestampType), true),
+        StructField("array_array", ArrayType(ArrayType(StringType), true), true),
+        StructField("struct_array", ArrayType(
+          StructType(Seq(StructField("name", StringType, true)))))))
+
+      val arrayOfByte = new Array[Byte](4)
+      for (i <- arrayOfByte.indices) {
+        arrayOfByte(i) = i.toByte
+      }
+
+      val rdd = spark.sparkContext.parallelize(Seq(
+        Row(arrayOfByte, Array[Short](1, 2, 3, 4), Array[Float](1f, 2f, 3f, 4f),
+          Array[Boolean](true, false, true, false), Array[Long](1L, 2L), Array[Double](1.0, 2.0),
+          Array[BigDecimal](BigDecimal.valueOf(3)), Array[Array[Byte]](arrayOfByte, arrayOfByte),
+          Array[Timestamp](new Timestamp(0)),
+          Array[Array[String]](Array[String]("CSH, tearing down the walls that divide us", "-jd")),
+          Array[Row](Row("Bobby G. can't swim")))))
+      val df = spark.createDataFrame(rdd, testSchema)
+      df.write.avro(dir.toString)
+      assert(spark.read.avro(dir.toString).count == rdd.count)
+    }
+  }
+
+  test("write with compression") {
+    TestUtils.withTempDir { dir =>
+      val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec"
+      val AVRO_DEFLATE_LEVEL = "spark.sql.avro.deflate.level"
+      val uncompressDir = s"$dir/uncompress"
+      val deflateDir = s"$dir/deflate"
+      val snappyDir = s"$dir/snappy"
+      val fakeDir = s"$dir/fake"
+
+      val df = spark.read.avro(testFile)
+      spark.conf.set(AVRO_COMPRESSION_CODEC, "uncompressed")
+      df.write.avro(uncompressDir)
+      spark.conf.set(AVRO_COMPRESSION_CODEC, "deflate")
+      spark.conf.set(AVRO_DEFLATE_LEVEL, "9")
+      df.write.avro(deflateDir)
+      spark.conf.set(AVRO_COMPRESSION_CODEC, "snappy")
+      df.write.avro(snappyDir)
+
+      val uncompressSize = FileUtils.sizeOfDirectory(new File(uncompressDir))
+      val deflateSize = FileUtils.sizeOfDirectory(new File(deflateDir))
+      val snappySize = FileUtils.sizeOfDirectory(new File(snappyDir))
+
+      assert(uncompressSize > deflateSize)
+      assert(snappySize > deflateSize)
+    }
+  }
+
+  test("dsl test") {
+    val results = spark.read.avro(episodesFile).select("title").collect()
+    assert(results.length === 8)
+  }
+
+  test("support of various data types") {
+    // This test uses data from test.avro. You can see the data and the schema of this file in
+    // test.json and test.avsc
+    val all = spark.read.avro(testFile).collect()
+    assert(all.length == 3)
+
+    val str = spark.read.avro(testFile).select("string").collect()
+    assert(str.map(_(0)).toSet.contains("Terran is IMBA!"))
+
+    val simple_map = spark.read.avro(testFile).select("simple_map").collect()
+    assert(simple_map(0)(0).getClass.toString.contains("Map"))
+    assert(simple_map.map(_(0).asInstanceOf[Map[String, Some[Int]]].size).toSet == Set(2, 0))
+
+    val union0 = spark.read.avro(testFile).select("union_string_null").collect()
+    assert(union0.map(_(0)).toSet == Set("abc", "123", null))
+
+    val union1 = spark.read.avro(testFile).select("union_int_long_null").collect()
+    assert(union1.map(_(0)).toSet == Set(66, 1, null))
+
+    val union2 = spark.read.avro(testFile).select("union_float_double").collect()
+    assert(
+      union2
+        .map(x => new java.lang.Double(x(0).toString))
+        .exists(p => Math.abs(p - Math.PI) < 0.001))
+
+    val fixed = spark.read.avro(testFile).select("fixed3").collect()
+    assert(fixed.map(_(0).asInstanceOf[Array[Byte]]).exists(p => p(1) == 3))
+
+    val enum = spark.read.avro(testFile).select("enum").collect()
+    assert(enum.map(_(0)).toSet == Set("SPADES", "CLUBS", "DIAMONDS"))
+
+    val record = spark.read.avro(testFile).select("record").collect()
+    assert(record(0)(0).getClass.toString.contains("Row"))
+    assert(record.map(_(0).asInstanceOf[Row](0)).contains("TEST_STR123"))
+
+    val array_of_boolean = spark.read.avro(testFile).select("array_of_boolean").collect()
+    assert(array_of_boolean.map(_(0).asInstanceOf[Seq[Boolean]].size).toSet == Set(3, 1, 0))
+
+    val bytes = spark.read.avro(testFile).select("bytes").collect()
+    assert(bytes.map(_(0).asInstanceOf[Array[Byte]].length).toSet == Set(3, 1, 0))
+  }
+
+  test("sql test") {
+    spark.sql(
+      s"""
+         |CREATE TEMPORARY TABLE avroTable
+         |USING avro
+         |OPTIONS (path "$episodesFile")
+      """.stripMargin.replaceAll("\n", " "))
+
+    assert(spark.sql("SELECT * FROM avroTable").collect().length === 8)
+  }
+
+  test("conversion to avro and back") {
+    // Note that test.avro includes a variety of types, some of which are nullable. We expect to
+    // get the same values back.
+    TestUtils.withTempDir { dir =>
+      val avroDir = s"$dir/avro"
+      spark.read.avro(testFile).write.avro(avroDir)
+      TestUtils.checkReloadMatchesSaved(spark, testFile, avroDir)
+    }
+  }
+
+  test("conversion to avro and back with namespace") {
+    // Note that test.avro includes a variety of types, some of which are nullable. We expect to
+    // get the same values back.
+    TestUtils.withTempDir { tempDir =>
+      val name = "AvroTest"
+      val namespace = "com.databricks.spark.avro"
+      val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
+
+      val avroDir = tempDir + "/namedAvro"
+      spark.read.avro(testFile).write.options(parameters).avro(avroDir)
+      TestUtils.checkReloadMatchesSaved(spark, testFile, avroDir)
+
+      // Look at raw file and make sure has namespace info
+      val rawSaved = spark.sparkContext.textFile(avroDir)
+      val schema = rawSaved.collect().mkString("")
+      assert(schema.contains(name))
+      assert(schema.contains(namespace))
+    }
+  }
+
+  test("converting some specific sparkSQL types to avro") {
+    TestUtils.withTempDir { tempDir =>
+      val testSchema = StructType(Seq(
+        StructField("Name", StringType, false),
+        StructField("Length", IntegerType, true),
+        StructField("Time", TimestampType, false),
+        StructField("Decimal", DecimalType(10, 2), true),
+        StructField("Binary", BinaryType, false)))
+
+      val arrayOfByte = new Array[Byte](4)
+      for (i <- arrayOfByte.indices) {
+        arrayOfByte(i) = i.toByte
+      }
+      val cityRDD = spark.sparkContext.parallelize(Seq(
+        Row("San Francisco", 12, new Timestamp(666), null, arrayOfByte),
+        Row("Palo Alto", null, new Timestamp(777), null, arrayOfByte),
+        Row("Munich", 8, new Timestamp(42), Decimal(3.14), arrayOfByte)))
+      val cityDataFrame = spark.createDataFrame(cityRDD, testSchema)
+
+      val avroDir = tempDir + "/avro"
+      cityDataFrame.write.avro(avroDir)
+      assert(spark.read.avro(avroDir).collect().length == 3)
+
+      // TimesStamps are converted to longs
+      val times = spark.read.avro(avroDir).select("Time").collect()
+      assert(times.map(_(0)).toSet == Set(666, 777, 42))
+
+      // DecimalType should be converted to string
+      val decimals = spark.read.avro(avroDir).select("Decimal").collect()
+      assert(decimals.map(_(0)).contains("3.14"))
+
+      // There should be a null entry
+      val length = spark.read.avro(avroDir).select("Length").collect()
+      assert(length.map(_(0)).contains(null))
+
+      val binary = spark.read.avro(avroDir).select("Binary").collect()
+      for (i <- arrayOfByte.indices) {
+        assert(binary(1)(0).asInstanceOf[Array[Byte]](i) == arrayOfByte(i))
+      }
+    }
+  }
+
+  test("correctly read long as date/timestamp type") {
+    TestUtils.withTempDir { tempDir =>
+      val sparkSession = spark
+      import sparkSession.implicits._
+
+      val currentTime = new Timestamp(System.currentTimeMillis())
+      val currentDate = new Date(System.currentTimeMillis())
+      val schema = StructType(Seq(
+        StructField("_1", DateType, false), StructField("_2", TimestampType, false)))
+      val writeDs = Seq((currentDate, currentTime)).toDS
+
+      val avroDir = tempDir + "/avro"
+      writeDs.write.avro(avroDir)
+      assert(spark.read.avro(avroDir).collect().length == 1)
+
+      val readDs = spark.read.schema(schema).avro(avroDir).as[(Date, Timestamp)]
+
+      assert(readDs.collect().sameElements(writeDs.collect()))
+    }
+  }
+
+  test("support of globbed paths") {
+    val e1 = spark.read.avro("*/test/resources/episodes.avro").collect()
+    assert(e1.length == 8)
+
+    val e2 = spark.read.avro("src/*/*/episodes.avro").collect()
+    assert(e2.length == 8)
+  }
+
+  test("does not coerce null date/timestamp value to 0 epoch.") {
+    TestUtils.withTempDir { tempDir =>
+      val sparkSession = spark
+      import sparkSession.implicits._
+
+      val nullTime: Timestamp = null
+      val nullDate: Date = null
+      val schema = StructType(Seq(
+        StructField("_1", DateType, nullable = true),
+        StructField("_2", TimestampType, nullable = true))
+      )
+      val writeDs = Seq((nullDate, nullTime)).toDS
+
+      val avroDir = tempDir + "/avro"
+      writeDs.write.avro(avroDir)
+      val readValues = spark.read.schema(schema).avro(avroDir).as[(Date, Timestamp)].collect
+
+      assert(readValues.size == 1)
+      assert(readValues.head == ((nullDate, nullTime)))
+    }
+  }
+
+  test("support user provided avro schema") {
+    val avroSchema =
+      """
+        |{
+        |  "type" : "record",
+        |  "name" : "test_schema",
+        |  "fields" : [{
+        |    "name" : "string",
+        |    "type" : "string",
+        |    "doc"  : "Meaningless string of characters"
+        |  }]
+        |}
+      """.stripMargin
+    val result = spark.read.option(AvroFileFormat.AvroSchema, avroSchema).avro(testFile).collect()
+    val expected = spark.read.avro(testFile).select("string").collect()
+    assert(result.sameElements(expected))
+  }
+
+  test("support user provided avro schema with defaults for missing fields") {
+    val avroSchema =
+      """
+        |{
+        |  "type" : "record",
+        |  "name" : "test_schema",
+        |  "fields" : [{
+        |    "name"    : "missingField",
+        |    "type"    : "string",
+        |    "default" : "foo"
+        |  }]
+        |}
+      """.stripMargin
+    val result = spark.read.option(AvroFileFormat.AvroSchema, avroSchema)
+      .avro(testFile).select("missingField").first
+    assert(result === Row("foo"))
+  }
+
+  test("reading from invalid path throws exception") {
+
+    // Directory given has no avro files
+    intercept[AnalysisException] {
+      TestUtils.withTempDir(dir => spark.read.avro(dir.getCanonicalPath))
+    }
+
+    intercept[AnalysisException] {
+      spark.read.avro("very/invalid/path/123.avro")
+    }
+
+    // In case of globbed path that can't be matched to anything, another exception is thrown (and
+    // exception message is helpful)
+    intercept[AnalysisException] {
+      spark.read.avro("*/*/*/*/*/*/*/something.avro")
+    }
+
+    intercept[FileNotFoundException] {
+      TestUtils.withTempDir { dir =>
+        FileUtils.touch(new File(dir, "test"))
+        spark.read.avro(dir.toString)
+      }
+    }
+
+  }
+
+  test("SQL test insert overwrite") {
+    TestUtils.withTempDir { tempDir =>
+      val tempEmptyDir = s"$tempDir/sqlOverwrite"
+      // Create a temp directory for table that will be overwritten
+      new File(tempEmptyDir).mkdirs()
+      spark.sql(
+        s"""
+           |CREATE TEMPORARY TABLE episodes
+           |USING avro
+           |OPTIONS (path "$episodesFile")
+         """.stripMargin.replaceAll("\n", " "))
+      spark.sql(
+        s"""
+           |CREATE TEMPORARY TABLE episodesEmpty
+           |(name string, air_date string, doctor int)
+           |USING avro
+           |OPTIONS (path "$tempEmptyDir")
+         """.stripMargin.replaceAll("\n", " "))
+
+      assert(spark.sql("SELECT * FROM episodes").collect().length === 8)
+      assert(spark.sql("SELECT * FROM episodesEmpty").collect().isEmpty)
+
+      spark.sql(
+        s"""
+           |INSERT OVERWRITE TABLE episodesEmpty
+           |SELECT * FROM episodes
+         """.stripMargin.replaceAll("\n", " "))
+      assert(spark.sql("SELECT * FROM episodesEmpty").collect().length == 8)
+    }
+  }
+
+  test("test save and load") {
+    // Test if load works as expected
+    TestUtils.withTempDir { tempDir =>
+      val df = spark.read.avro(episodesFile)
+      assert(df.count == 8)
+
+      val tempSaveDir = s"$tempDir/save/"
+
+      df.write.avro(tempSaveDir)
+      val newDf = spark.read.avro(tempSaveDir)
+      assert(newDf.count == 8)
+    }
+  }
+
+  test("test load with non-Avro file") {
+    // Test if load works as expected
+    TestUtils.withTempDir { tempDir =>
+      val df = spark.read.avro(episodesFile)
+      assert(df.count == 8)
+
+      val tempSaveDir = s"$tempDir/save/"
+      df.write.avro(tempSaveDir)
+
+      Files.createFile(new File(tempSaveDir, "non-avro").toPath)
+
+      val newDf = spark
+        .read
+        .option(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
+        .avro(tempSaveDir)
+
+      assert(newDf.count == 8)
+    }
+  }
+
+  test("read avro with user defined schema: read partial columns") {
+    val partialColumns = StructType(Seq(
+      StructField("string", StringType, false),
+      StructField("simple_map", MapType(StringType, IntegerType), false),
+      StructField("complex_map", MapType(StringType, MapType(StringType, StringType)), false),
+      StructField("union_string_null", StringType, true),
+      StructField("union_int_long_null", LongType, true),
+      StructField("fixed3", BinaryType, true),
+      StructField("fixed2", BinaryType, true),
+      StructField("enum", StringType, false),
+      StructField("record", StructType(Seq(StructField("value_field", StringType, false))), false),
+      StructField("array_of_boolean", ArrayType(BooleanType), false),
+      StructField("bytes", BinaryType, true)))
+    val withSchema = spark.read.schema(partialColumns).avro(testFile).collect()
+    val withOutSchema = spark
+      .read
+      .avro(testFile)
+      .select("string", "simple_map", "complex_map", "union_string_null", "union_int_long_null",
+        "fixed3", "fixed2", "enum", "record", "array_of_boolean", "bytes")
+      .collect()
+    assert(withSchema.sameElements(withOutSchema))
+  }
+
+  test("read avro with user defined schema: read non-exist columns") {
+    val schema =
+      StructType(
+        Seq(
+          StructField("non_exist_string", StringType, true),
+          StructField(
+            "record",
+            StructType(Seq(
+              StructField("non_exist_field", StringType, false),
+              StructField("non_exist_field2", StringType, false))),
+            false)))
+    val withEmptyColumn = spark.read.schema(schema).avro(testFile).collect()
+
+    assert(withEmptyColumn.forall(_ == Row(null: String, Row(null: String, null: String))))
+  }
+
+  test("read avro file partitioned") {
+    TestUtils.withTempDir { dir =>
+      val sparkSession = spark
+      import sparkSession.implicits._
+      val df = (0 to 1024 * 3).toDS.map(i => s"record${i}").toDF("records")
+      val outputDir = s"$dir/${UUID.randomUUID}"
+      df.write.avro(outputDir)
+      val input = spark.read.avro(outputDir)
+      assert(input.collect.toSet.size === 1024 * 3 + 1)
+      assert(input.rdd.partitions.size > 2)
+    }
+  }
+
+  case class NestedBottom(id: Int, data: String)
+
+  case class NestedMiddle(id: Int, data: NestedBottom)
+
+  case class NestedTop(id: Int, data: NestedMiddle)
+
+  test("saving avro that has nested records with the same name") {
+    TestUtils.withTempDir { tempDir =>
+      // Save avro file on output folder path
+      val writeDf = spark.createDataFrame(List(NestedTop(1, NestedMiddle(2, NestedBottom(3, "1")))))
+      val outputFolder = s"$tempDir/duplicate_names/"
+      writeDf.write.avro(outputFolder)
+      // Read avro file saved on the last step
+      val readDf = spark.read.avro(outputFolder)
+      // Check if the written DataFrame is equals than read DataFrame
+      assert(readDf.collect().sameElements(writeDf.collect()))
+    }
+  }
+
+  case class NestedMiddleArray(id: Int, data: Array[NestedBottom])
+
+  case class NestedTopArray(id: Int, data: NestedMiddleArray)
+
+  test("saving avro that has nested records with the same name inside an array") {
+    TestUtils.withTempDir { tempDir =>
+      // Save avro file on output folder path
+      val writeDf = spark.createDataFrame(
+        List(NestedTopArray(1, NestedMiddleArray(2, Array(
+          NestedBottom(3, "1"), NestedBottom(4, "2")
+        ))))
+      )
+      val outputFolder = s"$tempDir/duplicate_names_array/"
+      writeDf.write.avro(outputFolder)
+      // Read avro file saved on the last step
+      val readDf = spark.read.avro(outputFolder)
+      // Check if the written DataFrame is equals than read DataFrame
+      assert(readDf.collect().sameElements(writeDf.collect()))
+    }
+  }
+
+  case class NestedMiddleMap(id: Int, data: Map[String, NestedBottom])
+
+  case class NestedTopMap(id: Int, data: NestedMiddleMap)
+
+  test("saving avro that has nested records with the same name inside a map") {
+    TestUtils.withTempDir { tempDir =>
+      // Save avro file on output folder path
+      val writeDf = spark.createDataFrame(
+        List(NestedTopMap(1, NestedMiddleMap(2, Map(
+          "1" -> NestedBottom(3, "1"), "2" -> NestedBottom(4, "2")
+        ))))
+      )
+      val outputFolder = s"$tempDir/duplicate_names_map/"
+      writeDf.write.avro(outputFolder)
+      // Read avro file saved on the last step
+      val readDf = spark.read.avro(outputFolder)
+      // Check if the written DataFrame is equals than read DataFrame
+      assert(readDf.collect().sameElements(writeDf.collect()))
+    }
+  }
+}
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableConfigurationSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableConfigurationSuite.scala
new file mode 100755 (executable)
index 0000000..a0f8851
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerInstance}
+
+class SerializableConfigurationSuite extends SparkFunSuite {
+
+  private def testSerialization(serializer: SerializerInstance): Unit = {
+    import AvroFileFormat.SerializableConfiguration
+    val conf = new SerializableConfiguration(new Configuration())
+
+    val serialized = serializer.serialize(conf)
+
+    serializer.deserialize[Any](serialized) match {
+      case c: SerializableConfiguration =>
+        assert(c.log != null, "log was null")
+        assert(c.value != null, "value was null")
+      case other => fail(
+        s"Expecting ${classOf[SerializableConfiguration]}, but got ${other.getClass}.")
+    }
+  }
+
+  test("serialization with JavaSerializer") {
+    testSerialization(new JavaSerializer(new SparkConf()).newInstance())
+  }
+
+  test("serialization with KryoSerializer") {
+    testSerialization(new KryoSerializer(new SparkConf()).newInstance())
+  }
+
+}
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/TestUtils.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/TestUtils.scala
new file mode 100755 (executable)
index 0000000..4ae9b14
--- /dev/null
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import java.io.{File, IOException}
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.HashSet
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import com.google.common.io.Files
+import java.util
+
+import org.apache.spark.sql.SparkSession
+
+private[avro] object TestUtils {
+
+  /**
+   * This function checks that all records in a file match the original
+   * record.
+   */
+  def checkReloadMatchesSaved(spark: SparkSession, testFile: String, avroDir: String): Unit = {
+
+    def convertToString(elem: Any): String = {
+      elem match {
+        case null => "NULL" // HashSets can't have null in them, so we use a string instead
+        case arrayBuf: ArrayBuffer[_] =>
+          arrayBuf.asInstanceOf[ArrayBuffer[Any]].toArray.deep.mkString(" ")
+        case arrayByte: Array[Byte] => arrayByte.deep.mkString(" ")
+        case other => other.toString
+      }
+    }
+
+    val originalEntries = spark.read.avro(testFile).collect()
+    val newEntries = spark.read.avro(avroDir).collect()
+
+    assert(originalEntries.length == newEntries.length)
+
+    val origEntrySet = Array.fill(originalEntries(0).size)(new HashSet[Any]())
+    for (origEntry <- originalEntries) {
+      var idx = 0
+      for (origElement <- origEntry.toSeq) {
+        origEntrySet(idx) += convertToString(origElement)
+        idx += 1
+      }
+    }
+
+    for (newEntry <- newEntries) {
+      var idx = 0
+      for (newElement <- newEntry.toSeq) {
+        assert(origEntrySet(idx).contains(convertToString(newElement)))
+        idx += 1
+      }
+    }
+  }
+
+  def withTempDir(f: File => Unit): Unit = {
+    val dir = Files.createTempDir()
+    dir.delete()
+    try f(dir) finally deleteRecursively(dir)
+  }
+
+  /**
+   * This function deletes a file or a directory with everything that's in it. This function is
+   * copied from Spark with minor modifications made to it. See original source at:
+   * github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/Utils.scala
+   */
+
+  def deleteRecursively(file: File) {
+    def listFilesSafely(file: File): Seq[File] = {
+      if (file.exists()) {
+        val files = file.listFiles()
+        if (files == null) {
+          throw new IOException("Failed to list files for dir: " + file)
+        }
+        files
+      } else {
+        List()
+      }
+    }
+
+    if (file != null) {
+      try {
+        if (file.isDirectory) {
+          var savedIOException: IOException = null
+          for (child <- listFilesSafely(file)) {
+            try {
+              deleteRecursively(child)
+            } catch {
+              // In case of multiple exceptions, only last one will be thrown
+              case ioe: IOException => savedIOException = ioe
+            }
+          }
+          if (savedIOException != null) {
+            throw savedIOException
+          }
+        }
+      } finally {
+        if (!file.delete()) {
+          // Delete can also fail if the file simply did not exist
+          if (file.exists()) {
+            throw new IOException("Failed to delete: " + file.getAbsolutePath)
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * This function generates a random map(string, int) of a given size.
+   */
+  private[avro] def generateRandomMap(rand: Random, size: Int): java.util.Map[String, Int] = {
+    val jMap = new util.HashMap[String, Int]()
+    for (i <- 0 until size) {
+      jMap.put(rand.nextString(5), i)
+    }
+    jMap
+  }
+
+  /**
+   * This function generates a random array of booleans of a given size.
+   */
+  private[avro] def generateRandomArray(rand: Random, size: Int): util.ArrayList[Boolean] = {
+    val vec = new util.ArrayList[Boolean]()
+    for (i <- 0 until size) {
+      vec.add(rand.nextBoolean())
+    }
+    vec
+  }
+
+  /**
+   * This function generates a random ByteBuffer of a given size.
+   */
+  private[avro] def generateRandomByteBuffer(rand: Random, size: Int): ByteBuffer = {
+    val bb = ByteBuffer.allocate(size)
+    val arrayOfBytes = new Array[Byte](size)
+    rand.nextBytes(arrayOfBytes)
+    bb.put(arrayOfBytes)
+  }
+}
diff --git a/pom.xml b/pom.xml
index 6dee6fc..0392923 100644 (file)
--- a/pom.xml
+++ b/pom.xml
     <module>external/kafka-0-10</module>
     <module>external/kafka-0-10-assembly</module>
     <module>external/kafka-0-10-sql</module>
+    <module>external/avro</module>
     <!-- See additional modules enabled by profiles below -->
   </modules>
 
index f887e45..247b6fe 100644 (file)
@@ -40,8 +40,8 @@ object BuildCommons {
 
   private val buildLocation = file(".").getAbsoluteFile.getParentFile
 
-  val sqlProjects@Seq(catalyst, sql, hive, hiveThriftServer, sqlKafka010) = Seq(
-    "catalyst", "sql", "hive", "hive-thriftserver", "sql-kafka-0-10"
+  val sqlProjects@Seq(catalyst, sql, hive, hiveThriftServer, sqlKafka010, avro) = Seq(
+    "catalyst", "sql", "hive", "hive-thriftserver", "sql-kafka-0-10", "avro"
   ).map(ProjectRef(buildLocation, _))
 
   val streamingProjects@Seq(streaming, streamingKafka010) =
@@ -326,7 +326,7 @@ object SparkBuild extends PomBuild {
   val mimaProjects = allProjects.filterNot { x =>
     Seq(
       spark, hive, hiveThriftServer, catalyst, repl, networkCommon, networkShuffle, networkYarn,
-      unsafe, tags, sqlKafka010, kvstore
+      unsafe, tags, sqlKafka010, kvstore, avro
     ).contains(x)
   }
 
@@ -688,9 +688,11 @@ object Unidoc {
     publish := {},
 
     unidocProjectFilter in(ScalaUnidoc, unidoc) :=
-      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),
+      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes,
+        yarn, tags, streamingKafka010, sqlKafka010, avro),
     unidocProjectFilter in(JavaUnidoc, unidoc) :=
-      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),
+      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes,
+        yarn, tags, streamingKafka010, sqlKafka010, avro),
 
     unidocAllClasspaths in (ScalaUnidoc, unidoc) := {
       ignoreClasspaths((unidocAllClasspaths in (ScalaUnidoc, unidoc)).value)