SQOOP-2788: Sqoop2: Parquet support for HdfsConnector
authorJarek Jarcec Cecho <jarcec@apache.org>
Fri, 5 Feb 2016 20:51:08 +0000 (12:51 -0800)
committerJarek Jarcec Cecho <jarcec@apache.org>
Fri, 5 Feb 2016 20:51:08 +0000 (12:51 -0800)
(Abraham Fine via Jarek Jarcec Cecho)

16 files changed:
common/src/main/resources/org.apache.sqoop.connector-classloader.properties
connector/connector-hdfs/pom.xml
connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToFormat.java
connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java
connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsParquetWriter.java [new file with mode: 0644]
connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java
connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java
connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java
connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
pom.xml
test/pom.xml
test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java
test/src/test/java/org/apache/sqoop/integration/connector/hdfs/ParquetTest.java [new file with mode: 0644]

index c0082cc..0311f88 100644 (file)
@@ -52,6 +52,8 @@ system.classes.default=java.,\
   org.apache.log4j.,\
   org.apache.sqoop.,\
   -org.apache.sqoop.connector.,\
+  org.apache.avro.,\
+  org.codehaus.jackson.,\
   org.xerial.snappy.,\
   sqoop.properties,\
   sqoop_bootstrap.properties
index 5996314..37cf3fa 100644 (file)
@@ -73,6 +73,16 @@ limitations under the License.
       <scope>provided</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-hadoop</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-avro</artifactId>
+    </dependency>
+
   </dependencies>
 
   <build>
index 9ef2a05..5973463 100644 (file)
@@ -19,10 +19,14 @@ package org.apache.sqoop.connector.hdfs;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.nio.charset.Charset;
 import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
 
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Seekable;
@@ -33,13 +37,18 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.util.LineReader;
 import org.apache.log4j.Logger;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.hadoop.ParquetInputFormat;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.common.SqoopIDFUtils;
 import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat;
 import org.apache.sqoop.error.code.HdfsConnectorError;
 import org.apache.sqoop.etl.io.DataWriter;
 import org.apache.sqoop.job.etl.Extractor;
@@ -55,6 +64,10 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
 
   public static final Logger LOG = Logger.getLogger(HdfsExtractor.class);
 
+  // the sequence of bytes that appears at the beginning and end of every
+  // parquet file
+  private static final byte[] PARQUET_MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
+
   private Configuration conf = new Configuration();
   private DataWriter dataWriter;
   private Schema schema;
@@ -85,7 +98,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
   private void extractFile(LinkConfiguration linkConfiguration,
                            FromJobConfiguration fromJobConfiguration,
                            Path file, long start, long length, String[] locations)
-      throws IOException {
+    throws IOException, InterruptedException {
     long end = start + length;
     LOG.info("Extracting file " + file);
     LOG.info("\t from offset " + start);
@@ -93,8 +106,10 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
     LOG.info("\t of length " + length);
     if(isSequenceFile(file)) {
       extractSequenceFile(linkConfiguration, fromJobConfiguration, file, start, length, locations);
-    } else {
-      extractTextFile(linkConfiguration, fromJobConfiguration, file, start, length, locations);
+    } else if(isParquetFile(file)) {
+      extractParquetFile(linkConfiguration, fromJobConfiguration, file, start, length, locations);
+      } else {
+      extractTextFile(linkConfiguration, fromJobConfiguration, file, start, length);
     }
   }
 
@@ -136,7 +151,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
   @SuppressWarnings("resource")
   private void extractTextFile(LinkConfiguration linkConfiguration,
                                FromJobConfiguration fromJobConfiguration,
-                               Path file, long start, long length, String[] locations)
+                               Path file, long start, long length)
       throws IOException {
     LOG.info("Extracting text file");
     long end = start + length;
@@ -185,6 +200,35 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
     filestream.close();
   }
 
+  private void extractParquetFile(LinkConfiguration linkConfiguration,
+                                  FromJobConfiguration fromJobConfiguration,
+                                  Path file, long start, long length,
+                                  String[] locations) throws IOException, InterruptedException {
+    // Parquet does not expose a way to directly deal with file splits
+    // except through the ParquetInputFormat (ParquetInputSplit is @private)
+    FileSplit fileSplit = new FileSplit(file, start, length, locations);
+    conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, AvroReadSupport.class.getName());
+    ParquetInputFormat parquetInputFormat = new ParquetInputFormat();
+
+    // ParquetReader needs a TaskAttemptContext to pass through the
+    // configuration object.
+    TaskAttemptContext taskAttemptContext = new SqoopTaskAttemptContext(conf);
+
+    RecordReader<Void, GenericRecord> recordReader = parquetInputFormat.createRecordReader(fileSplit, taskAttemptContext);
+    recordReader.initialize(fileSplit, taskAttemptContext);
+
+    AVROIntermediateDataFormat idf = new AVROIntermediateDataFormat(schema);
+    while (recordReader.nextKeyValue() != false) {
+      GenericRecord record = recordReader.getCurrentValue();
+      rowsRead++;
+      if (schema instanceof ByteArraySchema) {
+        dataWriter.writeArrayRecord(new Object[]{idf.toObject(record)});
+      } else {
+        dataWriter.writeArrayRecord(idf.toObject(record));
+      }
+    }
+  }
+
   @Override
   public long getRowsRead() {
     return rowsRead;
@@ -207,6 +251,41 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
     return true;
   }
 
+  private boolean isParquetFile(Path file) {
+    try {
+      FileSystem fileSystem = file.getFileSystem(conf);
+      FileStatus fileStatus = fileSystem.getFileStatus(file);
+      FSDataInputStream fsDataInputStream = fileSystem.open(file);
+
+      long fileLength = fileStatus.getLen();
+
+      byte[] fileStart = new byte[PARQUET_MAGIC.length];
+      fsDataInputStream.readFully(fileStart);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.error("file start: " + new String(fileStart, Charset.forName("ASCII")));
+      }
+
+      if (!Arrays.equals(fileStart, PARQUET_MAGIC)) {
+        return false;
+      }
+
+      long fileEndIndex = fileLength - PARQUET_MAGIC.length;
+      fsDataInputStream.seek(fileEndIndex);
+
+      byte[] fileEnd = new byte[PARQUET_MAGIC.length];
+      fsDataInputStream.readFully(fileEnd);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.error("file end: " + new String(fileEnd, Charset.forName("ASCII")));
+      }
+
+      return Arrays.equals(fileEnd, PARQUET_MAGIC);
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
   private void extractRow(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration, Text line) throws UnsupportedEncodingException {
     if (schema instanceof ByteArraySchema) {
       dataWriter.writeArrayRecord(new Object[] {line.toString().getBytes(SqoopIDFUtils.BYTE_FIELD_CHARSET)});
index 5de20c6..7cef93c 100644 (file)
@@ -32,6 +32,7 @@ import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
 import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter;
+import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsParquetWriter;
 import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsSequenceWriter;
 import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriter;
 import org.apache.sqoop.error.code.HdfsConnectorError;
@@ -89,7 +90,7 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
 
           GenericHdfsWriter filewriter = getWriter(toJobConfig);
 
-          filewriter.initialize(filepath, conf, codec);
+          filewriter.initialize(filepath, context.getSchema(), conf, codec);
 
       if (!HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig) || (context.getSchema() instanceof ByteArraySchema)) {
         String record;
@@ -119,8 +120,14 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
   }
 
   private GenericHdfsWriter getWriter(ToJobConfiguration toJobConf) {
-    return (toJobConf.toJobConfig.outputFormat == ToFormat.SEQUENCE_FILE) ? new HdfsSequenceWriter()
-        : new HdfsTextWriter();
+    switch(toJobConf.toJobConfig.outputFormat) {
+      case SEQUENCE_FILE:
+        return new HdfsSequenceWriter();
+      case PARQUET_FILE:
+        return new HdfsParquetWriter();
+      default:
+        return new HdfsTextWriter();
+    }
   }
 
   private String getCompressionCodecName(ToJobConfiguration toJobConf) {
@@ -151,11 +158,16 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
 
   //TODO: We should probably support configurable extensions at some point
   private static String getExtension(ToJobConfiguration toJobConf, CompressionCodec codec) {
-    if (toJobConf.toJobConfig.outputFormat == ToFormat.SEQUENCE_FILE)
-      return ".seq";
-    if (codec == null)
-      return ".txt";
-    return codec.getDefaultExtension();
+    switch(toJobConf.toJobConfig.outputFormat) {
+      case SEQUENCE_FILE:
+        return ".seq";
+      case PARQUET_FILE:
+        return ".parquet";
+      default:
+        if (codec == null)
+          return ".txt";
+        return codec.getDefaultExtension();
+    }
   }
 
   /* (non-Javadoc)
index 2ccccc4..31023e7 100644 (file)
@@ -20,12 +20,13 @@ package org.apache.sqoop.connector.hdfs.hdfsWriter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.sqoop.schema.Schema;
 
 import java.io.IOException;
 
 public abstract class GenericHdfsWriter {
 
-  public abstract void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException;
+  public abstract void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException;
 
   public abstract void write(String csv) throws IOException;
 
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsParquetWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsParquetWriter.java
new file mode 100644 (file)
index 0000000..4ec813b
--- /dev/null
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.hdfsWriter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat;
+import org.apache.sqoop.schema.Schema;
+
+import java.io.IOException;
+
+public class HdfsParquetWriter extends GenericHdfsWriter {
+
+  private ParquetWriter avroParquetWriter;
+  private Schema sqoopSchema;
+  private AVROIntermediateDataFormat avroIntermediateDataFormat;
+
+  @Override
+  public void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec hadoopCodec) throws IOException {
+    sqoopSchema = schema;
+    avroIntermediateDataFormat = new AVROIntermediateDataFormat(sqoopSchema);
+
+    CompressionCodecName parquetCodecName;
+    if (hadoopCodec == null) {
+      parquetCodecName = CompressionCodecName.UNCOMPRESSED;
+    } else {
+      parquetCodecName = CompressionCodecName.fromCompressionCodec(hadoopCodec.getClass());
+    }
+
+    avroParquetWriter =
+      AvroParquetWriter.builder(filepath)
+        .withSchema(avroIntermediateDataFormat.getAvroSchema())
+        .withCompressionCodec(parquetCodecName)
+        .withConf(conf).build();
+
+  }
+
+  @Override
+  public void write(String csv) throws IOException {
+    avroParquetWriter.write(avroIntermediateDataFormat.toAVRO(csv));
+  }
+
+  @Override
+  public void destroy() throws IOException {
+    avroParquetWriter.close();
+  }
+}
index 75c2e7e..dcce861 100644 (file)
@@ -23,16 +23,17 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.sqoop.schema.Schema;
 
 import java.io.IOException;
 
-public class HdfsSequenceWriter  extends GenericHdfsWriter {
+public class HdfsSequenceWriter extends GenericHdfsWriter {
 
   private SequenceFile.Writer filewriter;
   private Text text;
 
   @SuppressWarnings("deprecation")
-  public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException {
+  public void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException {
     if (codec != null) {
       filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
               conf, filepath, Text.class, NullWritable.class,
index 78cf973..384e330 100644 (file)
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.sqoop.connector.hdfs.HdfsConstants;
+import org.apache.sqoop.schema.Schema;
 
 import java.io.BufferedWriter;
 import java.io.DataOutputStream;
@@ -34,7 +35,7 @@ public class HdfsTextWriter extends GenericHdfsWriter {
   private BufferedWriter filewriter;
 
   @Override
-  public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException {
+  public void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException {
     FileSystem fs = filepath.getFileSystem(conf);
 
     DataOutputStream filestream = fs.create(filepath, false);
index adede3a..cbd555a 100644 (file)
@@ -17,9 +17,6 @@
  */
 package org.apache.sqoop.connector.hdfs;
 
-import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE;
-import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE;
-
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -27,6 +24,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -35,11 +33,17 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.connector.common.SqoopIDFUtils;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToCompression;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat;
 import org.apache.sqoop.etl.io.DataReader;
 import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
@@ -47,13 +51,18 @@ import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.schema.type.FixedPoint;
 import org.apache.sqoop.schema.type.FloatingPoint;
 import org.apache.sqoop.schema.type.Text;
-import org.testng.annotations.AfterMethod;
+import org.apache.sqoop.utils.ClassUtils;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Factory;
 import org.testng.annotations.Test;
 
+import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.PARQUET_FILE;
+import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE;
+import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE;
+
 public class TestLoader extends TestHdfsBase {
   private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
   private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
@@ -63,6 +72,7 @@ public class TestLoader extends TestHdfsBase {
   private final String outputDirectory;
   private Loader loader;
   private String user = "test_user";
+  private Schema schema;
 
   @Factory(dataProvider="test-hdfs-loader")
   public TestLoader(ToFormat outputFormat,
@@ -80,9 +90,10 @@ public class TestLoader extends TestHdfsBase {
     for (ToCompression compression : new ToCompression[]{
         ToCompression.DEFAULT,
         ToCompression.BZIP2,
+        ToCompression.GZIP,
         ToCompression.NONE
     }) {
-      for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) {
+      for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE, PARQUET_FILE}) {
         parameters.add(new Object[]{outputFileType, compression});
       }
     }
@@ -100,7 +111,7 @@ public class TestLoader extends TestHdfsBase {
   @Test
   public void testLoader() throws Exception {
     FileSystem fs = FileSystem.get(new Configuration());
-    Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true))
+    schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true))
         .addColumn(new FloatingPoint("col2", 4L))
         .addColumn(new Text("col3"));
 
@@ -130,14 +141,22 @@ public class TestLoader extends TestHdfsBase {
         assertTestUser(user);
         return null;
       }
-    }, null, user);
+    }, schema, user);
     LinkConfiguration linkConf = new LinkConfiguration();
     ToJobConfiguration jobConf = new ToJobConfiguration();
     jobConf.toJobConfig.compression = compression;
     jobConf.toJobConfig.outputFormat = outputFormat;
     Path outputPath = new Path(outputDirectory);
 
-    loader.load(context, linkConf, jobConf);
+    try {
+      loader.load(context, linkConf, jobConf);
+    } catch (Exception e) {
+      // we may wait to fail if the compression format selected is not supported by the
+      // output format
+      Assert.assertTrue(compressionNotSupported());
+      return;
+    }
+
     Assert.assertEquals(1, fs.listStatus(outputPath).length);
 
     for (FileStatus status : fs.listStatus(outputPath)) {
@@ -152,10 +171,26 @@ public class TestLoader extends TestHdfsBase {
     Assert.assertEquals(5, fs.listStatus(outputPath).length);
   }
 
+  private boolean compressionNotSupported() {
+    switch (outputFormat) {
+      case SEQUENCE_FILE:
+        return compression == ToCompression.GZIP;
+      case PARQUET_FILE:
+        return compression == ToCompression.BZIP2 || compression == ToCompression.DEFAULT;
+    }
+    return false;
+  }
+
   @Test
   public void testOverrideNull() throws Exception {
+    // Parquet supports an actual "null" value so overriding null would not make
+    // sense here
+    if (outputFormat == PARQUET_FILE) {
+      return;
+    }
+
     FileSystem fs = FileSystem.get(new Configuration());
-    Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true))
+    schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true))
         .addColumn(new FloatingPoint("col2", 8L))
         .addColumn(new Text("col3"))
         .addColumn(new Text("col4"));
@@ -199,7 +234,15 @@ public class TestLoader extends TestHdfsBase {
     jobConf.toJobConfig.nullValue = "\\N";
     Path outputPath = new Path(outputDirectory);
 
-    loader.load(context, linkConf, jobConf);
+    try {
+      loader.load(context, linkConf, jobConf);
+    } catch (Exception e) {
+      // we may wait to fail if the compression format selected is not supported by the
+      // output format
+      assert(compressionNotSupported());
+      return;
+    }
+
     Assert.assertEquals(1, fs.listStatus(outputPath).length);
 
     for (FileStatus status : fs.listStatus(outputPath)) {
@@ -214,7 +257,7 @@ public class TestLoader extends TestHdfsBase {
     Assert.assertEquals(5, fs.listStatus(outputPath).length);
   }
 
-  private void verifyOutput(FileSystem fs, Path file, String format) throws IOException {
+  private void verifyOutput(FileSystem fs, Path file, String format) throws Exception {
     Configuration conf = new Configuration();
     FSDataInputStream fsin = fs.open(file);
     CompressionCodec codec;
@@ -228,7 +271,9 @@ public class TestLoader extends TestHdfsBase {
           case BZIP2:
             Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("BZip2") != -1);
             break;
-
+          case GZIP:
+            Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Gzip") != -1);
+            break;
           case DEFAULT:
             if(org.apache.hadoop.util.VersionInfo.getVersion().matches("\\b1\\.\\d\\.\\d")) {
               Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Default") != -1);
@@ -283,10 +328,46 @@ public class TestLoader extends TestHdfsBase {
           line = new org.apache.hadoop.io.Text();
         }
         break;
+      case PARQUET_FILE:
+        String compressionCodecClassName = ParquetFileReader.readFooter(conf, file,  ParquetMetadataConverter.NO_FILTER).getBlocks().get(0).getColumns().get(0).getCodec().getHadoopCompressionCodecClassName();
+
+        if (compressionCodecClassName == null) {
+          codec = null;
+        } else {
+          codec = (CompressionCodec) ClassUtils.loadClass(compressionCodecClassName).newInstance();
+        }
+
+        // Verify compression
+        switch(compression) {
+          case GZIP:
+            Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Gzip") != -1);
+            break;
+
+          case NONE:
+          default:
+            Assert.assertNull(codec);
+            break;
+        }
+
+
+        ParquetReader<GenericRecord> avroParquetReader = AvroParquetReader.builder(file).build();
+        AVROIntermediateDataFormat avroIntermediateDataFormat = new AVROIntermediateDataFormat();
+        avroIntermediateDataFormat.setSchema(schema);
+        GenericRecord record;
+        index = 1;
+        while ((record = avroParquetReader.read()) != null) {
+          List<Object> objects = new ArrayList<>();
+          for (int i = 0; i < record.getSchema().getFields().size(); i++) {
+            objects.add(record.get(i));
+          }
+          Assert.assertEquals(SqoopIDFUtils.toText(avroIntermediateDataFormat.toCSV(record)), formatRow(format, index++));
+        }
+
+        break;
     }
   }
 
-  private void verifyOutput(FileSystem fs, Path file) throws IOException {
+  private void verifyOutput(FileSystem fs, Path file) throws Exception {
     verifyOutput(fs, file, "%d,%f,%s");
   }
-}
+}
\ No newline at end of file
index 985149c..89bc0f2 100644 (file)
@@ -43,7 +43,8 @@ public class SqoopAvroUtils {
    * Creates an Avro schema from a Sqoop schema.
    */
   public static Schema createAvroSchema(org.apache.sqoop.schema.Schema sqoopSchema) {
-    String name = sqoopSchema.getName();
+    // avro schema names cannot start with quotes, lets just remove them
+    String name = sqoopSchema.getName().replace("\"", "");
     String doc = sqoopSchema.getNote();
     String namespace = SQOOP_SCHEMA_NAMESPACE;
     Schema schema = Schema.createRecord(name, doc, namespace, false);
index ace1bdf..e409fc1 100644 (file)
@@ -148,7 +148,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
     return jars;
   }
 
-  private GenericRecord toAVRO(String csv) {
+  public GenericRecord toAVRO(String csv) {
 
     String[] csvStringArray = parseCSVString(csv);
 
@@ -175,7 +175,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
     return avroObject;
   }
 
-  private Object toAVRO(String csvString, Column column) {
+  public Object toAVRO(String csvString, Column column) {
     Object returnValue = null;
 
     switch (column.getType()) {
@@ -232,7 +232,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
     return returnValue;
   }
 
-  private GenericRecord toAVRO(Object[] objectArray) {
+  public GenericRecord toAVRO(Object[] objectArray) {
 
     if (objectArray == null) {
       return null;
@@ -311,7 +311,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
   }
 
   @SuppressWarnings("unchecked")
-  private String toCSV(GenericRecord record) {
+  public String toCSV(GenericRecord record) {
     Column[] columns = this.schema.getColumnsArray();
 
     StringBuilder csvString = new StringBuilder();
@@ -387,7 +387,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
   }
 
   @SuppressWarnings("unchecked")
-  private Object[] toObject(GenericRecord record) {
+  public Object[] toObject(GenericRecord record) {
 
     if (record == null) {
       return null;
@@ -459,4 +459,8 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
     }
     return object;
   }
+
+  public Schema getAvroSchema() {
+    return avroSchema;
+  }
 }
diff --git a/pom.xml b/pom.xml
index cb8a973..ba0a243 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -124,6 +124,7 @@ limitations under the License.
     <groovy.version>2.4.0</groovy.version>
     <jansi.version>1.7</jansi.version>
     <felix.version>2.4.0</felix.version>
+    <parquet.version>1.8.1</parquet.version>
     <!-- maven plugin versions -->
     <maven-assembly-plugin.version>2.6</maven-assembly-plugin.version>
   </properties>
@@ -700,6 +701,16 @@ limitations under the License.
         <artifactId>jetty-servlet</artifactId>
         <version>${jetty.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.parquet</groupId>
+        <artifactId>parquet-hadoop</artifactId>
+        <version>${parquet.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.parquet</groupId>
+        <artifactId>parquet-avro</artifactId>
+        <version>${parquet.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
index 451352a..134bca1 100644 (file)
@@ -175,6 +175,16 @@ limitations under the License.
       <artifactId>hadoop-common</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-hadoop</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-avro</artifactId>
+    </dependency>
+
   </dependencies>
 
   <!-- Add classifier name to the JAR name -->
index 3ec4f66..1e8c688 100644 (file)
@@ -20,17 +20,27 @@ package org.apache.sqoop.integration.connector.hdfs;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multiset;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.sqoop.connector.common.SqoopIDFUtils;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
+import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsParquetWriter;
+import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat;
 import org.apache.sqoop.model.MDriverConfig;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.DateTime;
+import org.apache.sqoop.schema.type.FixedPoint;
 import org.apache.sqoop.test.asserts.HdfsAsserts;
 import org.apache.sqoop.test.infrastructure.Infrastructure;
 import org.apache.sqoop.test.infrastructure.SqoopTestCase;
@@ -51,6 +61,7 @@ import org.testng.annotations.Test;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.List;
 
 @Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
@@ -64,6 +75,9 @@ public class NullValueTest extends SqoopTestCase {
   // The custom nullValue to use (set to null if default)
   private String nullValue;
 
+
+  private Schema sqoopSchema;
+
   @DataProvider(name="nul-value-test")
   public static Object[][] data(ITestContext context) {
     String customNullValue = "^&*custom!@";
@@ -80,12 +94,19 @@ public class NullValueTest extends SqoopTestCase {
   }
 
   @Override
+
   public String getTestName() {
     return methodName + "[" + format.name() + ", " + nullValue + "]";
   }
 
   @BeforeMethod
   public void setup() throws Exception {
+    sqoopSchema = new Schema("cities");
+    sqoopSchema.addColumn(new FixedPoint("id", Long.valueOf(Integer.SIZE), true));
+    sqoopSchema.addColumn(new org.apache.sqoop.schema.type.Text("country"));
+    sqoopSchema.addColumn(new DateTime("some_date", true, false));
+    sqoopSchema.addColumn(new org.apache.sqoop.schema.type.Text("city"));
+
     createTableCities();
   }
 
@@ -128,6 +149,27 @@ public class NullValueTest extends SqoopTestCase {
         }
         sequenceFileWriter.close();
         break;
+      case PARQUET_FILE:
+        // Parquet file format does not support using custom null values
+        if (usingCustomNullValue()) {
+          return;
+        } else {
+          HdfsParquetWriter parquetWriter = new HdfsParquetWriter();
+
+          Configuration conf = new Configuration();
+          FileSystem.setDefaultUri(conf, hdfsClient.getUri());
+
+          parquetWriter.initialize(
+            new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0001.parquet")),
+            sqoopSchema, conf, null);
+
+          for (String line : getCsv()) {
+            parquetWriter.write(line);
+          }
+
+          parquetWriter.destroy();
+          break;
+        }
       default:
         Assert.fail();
     }
@@ -166,6 +208,11 @@ public class NullValueTest extends SqoopTestCase {
 
   @Test
   public void testToHdfs() throws Exception {
+    // Parquet file format does not support using custom null values
+    if (usingCustomNullValue() && format == ToFormat.PARQUET_FILE) {
+      return;
+    }
+
     provider.insertRow(getTableName(), 1, "USA", Timestamp.valueOf("2004-10-23 00:00:00.000"), "San Francisco");
     provider.insertRow(getTableName(), 2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), (String) null);
     provider.insertRow(getTableName(), 3, (String) null, Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno");
@@ -203,16 +250,16 @@ public class NullValueTest extends SqoopTestCase {
 
     executeJob(job);
 
+
+    Multiset<String> setLines = HashMultiset.create(Arrays.asList(getCsv()));
+    Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"));
+    List<String> notFound = new ArrayList<>();
     switch (format) {
       case TEXT_FILE:
         HdfsAsserts.assertMapreduceOutput(hdfsClient,
           HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"), getCsv());
-        break;
+        return;
       case SEQUENCE_FILE:
-        Multiset<String> setLines = HashMultiset.create(Arrays.asList(getCsv()));
-        List<String> notFound = new ArrayList<>();
-        Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"));
-
         for(Path file : files) {
           SequenceFile.Reader.Option optPath = SequenceFile.Reader.file(file);
           SequenceFile.Reader sequenceFileReader = new SequenceFile.Reader(getHadoopConf(), optPath);
@@ -224,17 +271,32 @@ public class NullValueTest extends SqoopTestCase {
             }
           }
         }
-        if(!setLines.isEmpty() || !notFound.isEmpty()) {
-          LOG.error("Output do not match expectations.");
-          LOG.error("Expected lines that weren't present in the files:");
-          LOG.error("\t'" + StringUtils.join(setLines, "'\n\t'") + "'");
-          LOG.error("Extra lines in files that weren't expected:");
-          LOG.error("\t'" + StringUtils.join(notFound, "'\n\t'") + "'");
-          Assert.fail("Output do not match expectations.");
+        break;
+      case PARQUET_FILE:
+        AVROIntermediateDataFormat avroIntermediateDataFormat = new AVROIntermediateDataFormat(sqoopSchema);
+        notFound = new LinkedList<>();
+        for (Path file : files) {
+          ParquetReader<GenericRecord> avroParquetReader = AvroParquetReader.builder(file).build();
+          GenericRecord record;
+          while ((record = avroParquetReader.read()) != null) {
+            String recordAsCsv = avroIntermediateDataFormat.toCSV(record);
+            if (!setLines.remove(recordAsCsv)) {
+              notFound.add(recordAsCsv);
+            }
+          }
         }
         break;
       default:
         Assert.fail();
     }
+
+    if(!setLines.isEmpty() || !notFound.isEmpty()) {
+      LOG.error("Output do not match expectations.");
+      LOG.error("Expected lines that weren't present in the files:");
+      LOG.error("\t'" + StringUtils.join(setLines, "'\n\t'") + "'");
+      LOG.error("Extra lines in files that weren't expected:");
+      LOG.error("\t'" + StringUtils.join(notFound, "'\n\t'") + "'");
+      Assert.fail("Output do not match expectations.");
+    }
   }
 }
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/ParquetTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/ParquetTest.java
new file mode 100644 (file)
index 0000000..222c493
--- /dev/null
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.connector.hdfs;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
+import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsParquetWriter;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.DateTime;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.Text;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
+import org.apache.sqoop.test.utils.HdfsUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
+public class ParquetTest extends SqoopTestCase {
+
+  @AfterMethod
+  public void dropTable() {
+    super.dropTable();
+  }
+
+  @Test
+  public void toParquetTest() throws Exception {
+    createAndLoadTableCities();
+
+    // RDBMS link
+    MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector");
+    fillRdbmsLinkConfig(rdbmsConnection);
+    saveLink(rdbmsConnection);
+
+    // HDFS link
+    MLink hdfsConnection = getClient().createLink("hdfs-connector");
+    fillHdfsLink(hdfsConnection);
+    saveLink(hdfsConnection);
+
+    // Job creation
+    MJob job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName());
+
+
+    // Set rdbms "FROM" config
+    fillRdbmsFromConfig(job, "id");
+
+    // Fill the hdfs "TO" config
+    fillHdfsToConfig(job, ToFormat.PARQUET_FILE);
+
+    saveJob(job);
+    executeJob(job);
+
+    String[] expectedOutput =
+      {"'1','USA','2004-10-23 00:00:00.000','San Francisco'",
+        "'2','USA','2004-10-24 00:00:00.000','Sunnyvale'",
+        "'3','Czech Republic','2004-10-25 00:00:00.000','Brno'",
+        "'4','USA','2004-10-26 00:00:00.000','Palo Alto'"};
+
+
+    Multiset<String> setLines = HashMultiset.create(Arrays.asList(expectedOutput));
+
+    List<String> notFound = new LinkedList<>();
+
+    Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, getMapreduceDirectory());
+    for (Path file : files) {
+      ParquetReader<GenericRecord> avroParquetReader = AvroParquetReader.builder(file).build();
+      GenericRecord record;
+      while ((record = avroParquetReader.read()) != null) {
+        String recordAsLine = recordToLine(record);
+        if (!setLines.remove(recordAsLine)) {
+          notFound.add(recordAsLine);
+        }
+      }
+    }
+
+    if (!setLines.isEmpty() || !notFound.isEmpty()) {
+      fail("Output do not match expectations.");
+    }
+  }
+
+  @Test
+  public void fromParquetTest() throws Exception {
+    createTableCities();
+
+    Schema sqoopSchema = new Schema("cities");
+    sqoopSchema.addColumn(new FixedPoint("id", Long.valueOf(Integer.SIZE), true));
+    sqoopSchema.addColumn(new Text("country"));
+    sqoopSchema.addColumn(new DateTime("some_date", true, false));
+    sqoopSchema.addColumn(new Text("city"));
+
+    HdfsParquetWriter parquetWriter = new HdfsParquetWriter();
+
+    Configuration conf = new Configuration();
+    FileSystem.setDefaultUri(conf, hdfsClient.getUri());
+
+    parquetWriter.initialize(
+      new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0001.parquet")),
+      sqoopSchema, conf, null);
+
+    parquetWriter.write("1,'USA','2004-10-23 00:00:00.000','San Francisco'");
+    parquetWriter.write("2,'USA','2004-10-24 00:00:00.000','Sunnyvale'");
+
+    parquetWriter.destroy();
+
+    parquetWriter.initialize(
+      new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0002.parquet")),
+      sqoopSchema, conf, null);
+
+    parquetWriter.write("3,'Czech Republic','2004-10-25 00:00:00.000','Brno'");
+    parquetWriter.write("4,'USA','2004-10-26 00:00:00.000','Palo Alto'");
+
+    parquetWriter.destroy();
+
+    // RDBMS link
+    MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
+    fillRdbmsLinkConfig(rdbmsLink);
+    saveLink(rdbmsLink);
+
+    // HDFS link
+    MLink hdfsLink = getClient().createLink("hdfs-connector");
+    fillHdfsLink(hdfsLink);
+    saveLink(hdfsLink);
+
+    // Job creation
+    MJob job = getClient().createJob(hdfsLink.getName(), rdbmsLink.getName());
+    fillHdfsFromConfig(job);
+    fillRdbmsToConfig(job);
+    saveJob(job);
+
+    executeJob(job);
+    assertEquals(provider.rowCount(getTableName()), 4);
+    assertRowInCities(1, "USA", Timestamp.valueOf("2004-10-23 00:00:00.000"), "San Francisco");
+    assertRowInCities(2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), "Sunnyvale");
+    assertRowInCities(3, "Czech Republic", Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno");
+    assertRowInCities(4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto");
+  }
+
+  public String recordToLine(GenericRecord genericRecord) {
+    String line = "";
+    line += "\'" + String.valueOf(genericRecord.get(0)) + "\',";
+    line += "\'" + String.valueOf(genericRecord.get(1)) + "\',";
+    line += "\'" + new Timestamp((Long)genericRecord.get(2)) + "00\',";
+    line += "\'" + String.valueOf(genericRecord.get(3)) + "\'";
+    return line;
+  }
+
+}