SQOOP-3328: Implement an alternative solution for Parquet reading and writing
authorBoglarka Egyed <bogi@apache.org>
Thu, 28 Jun 2018 14:41:01 +0000 (16:41 +0200)
committerBoglarka Egyed <bogi@apache.org>
Thu, 28 Jun 2018 14:41:01 +0000 (16:41 +0200)
(Szabolcs Vasas via Boglarka Egyed)

40 files changed:
src/java/org/apache/sqoop/SqoopOptions.java
src/java/org/apache/sqoop/avro/AvroUtil.java
src/java/org/apache/sqoop/manager/ConnManager.java
src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
src/java/org/apache/sqoop/mapreduce/parquet/ParquetConstants.java
src/java/org/apache/sqoop/mapreduce/parquet/ParquetExportJobConfigurator.java
src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java
src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactory.java
src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/parquet/ParquetMergeJobConfigurator.java
src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopMergeParquetReducer.java [moved from src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactoryProvider.java with 60% similarity]
src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetExportJobConfigurator.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetExportMapper.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportJobConfigurator.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportMapper.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetJobConfiguratorFactory.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetMergeJobConfigurator.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java
src/java/org/apache/sqoop/tool/BaseSqoopTool.java
src/java/org/apache/sqoop/tool/ImportTool.java
src/java/org/apache/sqoop/tool/MergeTool.java
src/test/org/apache/sqoop/TestBigDecimalExport.java
src/test/org/apache/sqoop/TestMerge.java
src/test/org/apache/sqoop/TestParquetExport.java
src/test/org/apache/sqoop/TestParquetImport.java
src/test/org/apache/sqoop/TestParquetIncrementalImportMerge.java [new file with mode: 0644]
src/test/org/apache/sqoop/TestSqoopOptions.java
src/test/org/apache/sqoop/hive/TestHiveServer2TextImport.java
src/test/org/apache/sqoop/testutil/BaseSqoopTestCase.java
src/test/org/apache/sqoop/testutil/ImportJobTestCase.java
src/test/org/apache/sqoop/tool/TestBaseSqoopTool.java
src/test/org/apache/sqoop/util/ParquetReader.java

index d9984af..3a19aea 100644 (file)
@@ -36,6 +36,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sqoop.accumulo.AccumuloConstants;
 import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
+import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation;
 import org.apache.sqoop.tool.BaseSqoopTool;
 import org.apache.sqoop.util.CredentialsUtil;
 import org.apache.sqoop.util.LoggingUtils;
@@ -52,6 +53,7 @@ import org.apache.sqoop.util.RandomHash;
 import org.apache.sqoop.util.StoredAsProperty;
 
 import static org.apache.sqoop.Sqoop.SQOOP_RETHROW_PROPERTY;
+import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
 import static org.apache.sqoop.orm.ClassWriter.toJavaIdentifier;
 
 /**
@@ -458,6 +460,9 @@ public class SqoopOptions implements Cloneable {
   @StoredAsProperty("hs2.keytab")
   private String hs2Keytab;
 
+  @StoredAsProperty("parquet.configurator.implementation")
+  private ParquetJobConfiguratorImplementation parquetConfiguratorImplementation;
+
   public SqoopOptions() {
     initDefaults(null);
   }
@@ -1152,6 +1157,8 @@ public class SqoopOptions implements Cloneable {
 
     // set escape column mapping to true
     this.escapeColumnMappingEnabled = true;
+
+    this.parquetConfiguratorImplementation = KITE;
   }
 
   /**
@@ -2925,5 +2932,12 @@ public class SqoopOptions implements Cloneable {
     this.hs2Keytab = hs2Keytab;
   }
 
+  public ParquetJobConfiguratorImplementation getParquetConfiguratorImplementation() {
+    return parquetConfiguratorImplementation;
+  }
+
+  public void setParquetConfiguratorImplementation(ParquetJobConfiguratorImplementation parquetConfiguratorImplementation) {
+    this.parquetConfiguratorImplementation = parquetConfiguratorImplementation;
+  }
 }
 
index 57c2062..1663b1d 100644 (file)
@@ -40,6 +40,11 @@ import org.apache.sqoop.config.ConfigurationHelper;
 import org.apache.sqoop.lib.BlobRef;
 import org.apache.sqoop.lib.ClobRef;
 import org.apache.sqoop.orm.ClassWriter;
+import parquet.avro.AvroSchemaConverter;
+import parquet.format.converter.ParquetMetadataConverter;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.MessageType;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -285,24 +290,7 @@ public final class AvroUtil {
    */
   public static Schema getAvroSchema(Path path, Configuration conf)
       throws IOException {
-    FileSystem fs = path.getFileSystem(conf);
-    Path fileToTest;
-    if (fs.isDirectory(path)) {
-      FileStatus[] fileStatuses = fs.listStatus(path, new PathFilter() {
-        @Override
-        public boolean accept(Path p) {
-          String name = p.getName();
-          return !name.startsWith("_") && !name.startsWith(".");
-        }
-      });
-      if (fileStatuses.length == 0) {
-        return null;
-      }
-      fileToTest = fileStatuses[0].getPath();
-    } else {
-      fileToTest = path;
-    }
-
+    Path fileToTest = getFileToTest(path, conf);
     SeekableInput input = new FsInput(fileToTest, conf);
     DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
     FileReader<GenericRecord> fileReader = DataFileReader.openReader(input, reader);
@@ -340,8 +328,37 @@ public final class AvroUtil {
 
     return LogicalTypes.decimal(precision, scale);
   }
+  private static Path getFileToTest(Path path, Configuration conf) throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    if (!fs.isDirectory(path)) {
+      return path;
+    }
+    FileStatus[] fileStatuses = fs.listStatus(path, new PathFilter() {
+      @Override
+      public boolean accept(Path p) {
+        String name = p.getName();
+        return !name.startsWith("_") && !name.startsWith(".");
+      }
+    });
+    if (fileStatuses.length == 0) {
+      return null;
+    }
+    return fileStatuses[0].getPath();
+  }
 
   public static Schema parseAvroSchema(String schemaString) {
     return new Schema.Parser().parse(schemaString);
   }
+
+  public static Schema getAvroSchemaFromParquetFile(Path path, Configuration conf) throws IOException {
+    Path fileToTest = getFileToTest(path, conf);
+    if (fileToTest == null) {
+      return null;
+    }
+    ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(conf, fileToTest, ParquetMetadataConverter.NO_FILTER);
+
+    MessageType parquetSchema = parquetMetadata.getFileMetaData().getSchema();
+    AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
+    return avroSchemaConverter.convert(parquetSchema);
+  }
 }
index c80dd5d..4c1e8f5 100644 (file)
@@ -46,7 +46,6 @@ import org.apache.sqoop.hive.HiveTypes;
 import org.apache.sqoop.lib.BlobRef;
 import org.apache.sqoop.lib.ClobRef;
 import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
-import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactoryProvider;
 import org.apache.sqoop.util.ExportException;
 import org.apache.sqoop.util.ImportException;
 
@@ -869,7 +868,7 @@ public abstract class ConnManager {
   }
 
   public ParquetJobConfiguratorFactory getParquetJobConfigurator() {
-    return ParquetJobConfiguratorFactoryProvider.createParquetJobConfiguratorFactory(options.getConf());
+    return options.getParquetConfiguratorImplementation().createFactory();
   }
 }
 
index 3b54210..349ca8d 100644 (file)
@@ -49,6 +49,8 @@ import org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat;
 import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
 import org.apache.sqoop.orm.AvroSchemaGenerator;
 
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
+
 /**
  * Actually runs a jdbc import job using the ORM files generated by the
  * sqoop.orm package. Uses DataDrivenDBInputFormat.
@@ -114,6 +116,7 @@ public class DataDrivenImportJob extends ImportJobBase {
       Schema schema = generateAvroSchema(tableName, schemaNameOverride);
       Path destination = getContext().getDestination();
 
+      options.getConf().set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, schema.toString());
       parquetImportJobConfigurator.configureMapper(job, schema, options, tableName, destination);
     }
 
index 17c9ed3..80c0698 100644 (file)
@@ -152,6 +152,7 @@ public class ImportJobBase extends JobBase {
           String shortName = CodecMap.getCodecShortNameByName(codecName, conf);
           if (!shortName.equalsIgnoreCase("default")) {
             conf.set(SQOOP_PARQUET_OUTPUT_CODEC_KEY, shortName);
+            options.getConf().set(SQOOP_PARQUET_OUTPUT_CODEC_KEY, shortName);
           }
         }
       }
index ae53a96..63fe92a 100644 (file)
@@ -25,6 +25,8 @@ public final class ParquetConstants {
 
   public static final String SQOOP_PARQUET_OUTPUT_CODEC_KEY = "parquetjob.output.codec";
 
+  public static final String PARQUET_JOB_CONFIGURATOR_IMPLEMENTATION_KEY = "parquetjob.configurator.implementation";
+
   private ParquetConstants() {
     throw new AssertionError("This class is meant for static use only.");
   }
index 8d7b87f..d4c50a3 100644 (file)
@@ -25,6 +25,10 @@ import org.apache.hadoop.mapreduce.Mapper;
 
 import java.io.IOException;
 
+/**
+ * This interface defines the type of a product of {@link ParquetJobConfiguratorFactory}.
+ * The implementations of the methods of this interface help to configure Sqoop Parquet export jobs.
+ */
 public interface ParquetExportJobConfigurator {
 
   void configureInputFormat(Job job, Path inputPath) throws IOException;
index fa1bc7d..eb6d08f 100644 (file)
@@ -27,6 +27,10 @@ import org.apache.sqoop.SqoopOptions;
 
 import java.io.IOException;
 
+/**
+ * This interface defines the type of a product of {@link ParquetJobConfiguratorFactory}.
+ * The implementations of the methods of this interface help to configure Sqoop Parquet import jobs.
+ */
 public interface ParquetImportJobConfigurator {
 
   void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException;
index ed5103f..a97c243 100644 (file)
 
 package org.apache.sqoop.mapreduce.parquet;
 
+/**
+ * This interface is an abstract factory of objects which configure Sqoop Parquet jobs.
+ * Every product is responsible for configuring different types of Sqoop jobs.
+ *
+ * @see ParquetImportJobConfigurator
+ * @see ParquetExportJobConfigurator
+ * @see ParquetMergeJobConfigurator
+ */
 public interface ParquetJobConfiguratorFactory {
 
   ParquetImportJobConfigurator createParquetImportJobConfigurator();
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java
new file mode 100644 (file)
index 0000000..050c854
--- /dev/null
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet;
+
+import org.apache.sqoop.mapreduce.parquet.hadoop.HadoopParquetJobConfiguratorFactory;
+import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetJobConfiguratorFactory;
+
+/**
+ * An enum containing all the implementations available for {@link ParquetJobConfiguratorFactory}.
+ * The enumeration constants are also used to instantiate concrete {@link ParquetJobConfiguratorFactory} objects.
+ */
+public enum ParquetJobConfiguratorImplementation {
+  KITE(KiteParquetJobConfiguratorFactory.class), HADOOP(HadoopParquetJobConfiguratorFactory.class);
+
+  private Class<? extends ParquetJobConfiguratorFactory> configuratorFactoryClass;
+
+  ParquetJobConfiguratorImplementation(Class<? extends ParquetJobConfiguratorFactory> configuratorFactoryClass) {
+    this.configuratorFactoryClass = configuratorFactoryClass;
+  }
+
+  public ParquetJobConfiguratorFactory createFactory() {
+    try {
+      return configuratorFactoryClass.newInstance();
+    } catch (InstantiationException | IllegalAccessException e) {
+      throw new RuntimeException("Could not instantiate factory class: " + configuratorFactoryClass, e);
+    }
+  }
+}
index 67fdf66..e5fe53f 100644 (file)
@@ -24,6 +24,10 @@ import org.apache.hadoop.mapreduce.Job;
 
 import java.io.IOException;
 
+/**
+ * This interface defines the type of a product of {@link ParquetJobConfiguratorFactory}.
+ * The implementations of the methods of this interface help to configure Sqoop Parquet merge jobs.
+ */
 public interface ParquetMergeJobConfigurator {
 
   void configureParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath, Path finalPath) throws IOException;
  * limitations under the License.
  */
 
-package org.apache.sqoop.mapreduce.parquet;
+package org.apache.sqoop.mapreduce.parquet.hadoop;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetJobConfiguratorFactory;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.sqoop.mapreduce.MergeParquetReducer;
 
-public final class ParquetJobConfiguratorFactoryProvider {
+import java.io.IOException;
 
-  private ParquetJobConfiguratorFactoryProvider() {
-    throw new AssertionError("This class is meant for static use only.");
-  }
+/**
+ * An implementation of {@link MergeParquetReducer} which depends on the Hadoop Parquet library.
+ */
+public class HadoopMergeParquetReducer extends MergeParquetReducer<Void, GenericRecord> {
 
-  public static ParquetJobConfiguratorFactory createParquetJobConfiguratorFactory(Configuration configuration) {
-    return new KiteParquetJobConfiguratorFactory();
+  @Override
+  protected void write(Context context, GenericRecord record) throws IOException, InterruptedException {
+    context.write(null, record);
   }
-
 }
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetExportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetExportJobConfigurator.java
new file mode 100644 (file)
index 0000000..2180cc2
--- /dev/null
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.hadoop;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
+import parquet.avro.AvroParquetInputFormat;
+
+import java.io.IOException;
+
+/**
+ * An implementation of {@link ParquetExportJobConfigurator} which depends on the Hadoop Parquet library.
+ */
+public class HadoopParquetExportJobConfigurator implements ParquetExportJobConfigurator {
+
+  @Override
+  public void configureInputFormat(Job job, Path inputPath) throws IOException {
+    // do nothing
+  }
+
+  @Override
+  public Class<? extends Mapper> getMapperClass() {
+    return HadoopParquetExportMapper.class;
+  }
+
+  @Override
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return AvroParquetInputFormat.class;
+  }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetExportMapper.java b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetExportMapper.java
new file mode 100644 (file)
index 0000000..f960f21
--- /dev/null
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.hadoop;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.sqoop.mapreduce.GenericRecordExportMapper;
+
+import java.io.IOException;
+
+/**
+ * An implementation of {@link GenericRecordExportMapper} which depends on the Hadoop Parquet library.
+ */
+public class HadoopParquetExportMapper extends GenericRecordExportMapper<Void, GenericRecord> {
+
+  @Override
+  protected void map(Void key, GenericRecord val, Context context) throws IOException, InterruptedException {
+    context.write(toSqoopRecord(val), NullWritable.get());
+  }
+
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportJobConfigurator.java
new file mode 100644 (file)
index 0000000..3f35faf
--- /dev/null
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.hadoop;
+
+import org.apache.avro.Schema;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
+import parquet.avro.AvroParquetOutputFormat;
+import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.metadata.CompressionCodecName;
+
+import java.io.IOException;
+
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_OUTPUT_CODEC_KEY;
+
+/**
+ * An implementation of {@link ParquetImportJobConfigurator} which depends on the Hadoop Parquet library.
+ */
+public class HadoopParquetImportJobConfigurator implements ParquetImportJobConfigurator {
+
+  private static final Log LOG = LogFactory.getLog(HadoopParquetImportJobConfigurator.class.getName());
+
+  @Override
+  public void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException {
+    configureAvroSchema(job, schema);
+    configureOutputCodec(job);
+  }
+
+  @Override
+  public Class<? extends Mapper> getMapperClass() {
+    return HadoopParquetImportMapper.class;
+  }
+
+  @Override
+  public Class<? extends OutputFormat> getOutputFormatClass() {
+    return AvroParquetOutputFormat.class;
+  }
+
+  void configureOutputCodec(Job job) {
+    String outputCodec = job.getConfiguration().get(SQOOP_PARQUET_OUTPUT_CODEC_KEY);
+    if (outputCodec != null) {
+      LOG.info("Using output codec: " + outputCodec);
+      ParquetOutputFormat.setCompression(job, CompressionCodecName.fromConf(outputCodec));
+    }
+  }
+
+  void configureAvroSchema(Job job, Schema schema) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Using Avro schema: " + schema);
+    }
+    AvroParquetOutputFormat.setSchema(job, schema);
+  }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportMapper.java
new file mode 100644 (file)
index 0000000..7a66d11
--- /dev/null
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.hadoop;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.sqoop.avro.AvroUtil;
+import org.apache.sqoop.lib.LargeObjectLoader;
+import org.apache.sqoop.mapreduce.ParquetImportMapper;
+
+import java.io.IOException;
+
+/**
+ * An implementation of {@link ParquetImportMapper} which depends on the Hadoop Parquet library.
+ */
+public class HadoopParquetImportMapper extends ParquetImportMapper<NullWritable, GenericRecord> {
+
+  private static final Log LOG = LogFactory.getLog(HadoopParquetImportMapper.class.getName());
+
+  /**
+   * The key to get the configuration value set by
+   * parquet.avro.AvroParquetOutputFormat#setSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema)
+   */
+  private static final String HADOOP_PARQUET_AVRO_SCHEMA_KEY = "parquet.avro.schema";
+
+  @Override
+  protected LargeObjectLoader createLobLoader(Context context) throws IOException, InterruptedException {
+    return new LargeObjectLoader(context.getConfiguration(), FileOutputFormat.getWorkOutputPath(context));
+  }
+
+  @Override
+  protected Schema getAvroSchema(Configuration configuration) {
+    String schemaString = configuration.get(HADOOP_PARQUET_AVRO_SCHEMA_KEY);
+    LOG.debug("Found Avro schema: " + schemaString);
+    return AvroUtil.parseAvroSchema(schemaString);
+  }
+
+  @Override
+  protected void write(Context context, GenericRecord record) throws IOException, InterruptedException {
+    context.write(null, record);
+  }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetJobConfiguratorFactory.java b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetJobConfiguratorFactory.java
new file mode 100644 (file)
index 0000000..052ea04
--- /dev/null
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.hadoop;
+
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
+import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
+import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
+import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
+
+/**
+ * A concrete factory implementation which produces configurator objects using the Hadoop Parquet library.
+ */
+public class HadoopParquetJobConfiguratorFactory implements ParquetJobConfiguratorFactory {
+
+  @Override
+  public ParquetImportJobConfigurator createParquetImportJobConfigurator() {
+    return new HadoopParquetImportJobConfigurator();
+  }
+
+  @Override
+  public ParquetExportJobConfigurator createParquetExportJobConfigurator() {
+    return new HadoopParquetExportJobConfigurator();
+  }
+
+  @Override
+  public ParquetMergeJobConfigurator createParquetMergeJobConfigurator() {
+    return new HadoopParquetMergeJobConfigurator();
+  }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetMergeJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetMergeJobConfigurator.java
new file mode 100644 (file)
index 0000000..66ebc5b
--- /dev/null
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.hadoop;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaValidationException;
+import org.apache.avro.SchemaValidator;
+import org.apache.avro.SchemaValidatorBuilder;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.sqoop.mapreduce.MergeParquetMapper;
+import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
+import parquet.avro.AvroParquetInputFormat;
+
+import java.io.IOException;
+
+import static java.lang.String.format;
+import static java.util.Collections.singleton;
+import static org.apache.sqoop.avro.AvroUtil.getAvroSchemaFromParquetFile;
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
+
+/**
+ * An implementation of {@link ParquetMergeJobConfigurator} which depends on the Hadoop Parquet library.
+ */
+public class HadoopParquetMergeJobConfigurator implements ParquetMergeJobConfigurator {
+
+  public static final Log LOG = LogFactory.getLog(HadoopParquetMergeJobConfigurator.class.getName());
+
+  private final HadoopParquetImportJobConfigurator importJobConfigurator;
+
+  private final HadoopParquetExportJobConfigurator exportJobConfigurator;
+
+  public HadoopParquetMergeJobConfigurator(HadoopParquetImportJobConfigurator importJobConfigurator, HadoopParquetExportJobConfigurator exportJobConfigurator) {
+    this.importJobConfigurator = importJobConfigurator;
+    this.exportJobConfigurator = exportJobConfigurator;
+  }
+
+  public HadoopParquetMergeJobConfigurator() {
+    this(new HadoopParquetImportJobConfigurator(), new HadoopParquetExportJobConfigurator());
+  }
+
+  @Override
+  public void configureParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath,
+                                       Path finalPath) throws IOException {
+    try {
+      LOG.info("Trying to merge parquet files");
+      job.setOutputKeyClass(Void.class);
+      job.setMapperClass(MergeParquetMapper.class);
+      job.setReducerClass(HadoopMergeParquetReducer.class);
+      job.setOutputValueClass(GenericRecord.class);
+
+      Schema avroSchema = loadAvroSchema(conf, oldPath);
+
+      validateNewPathAvroSchema(getAvroSchemaFromParquetFile(newPath, conf), avroSchema);
+
+      job.setInputFormatClass(exportJobConfigurator.getInputFormatClass());
+      AvroParquetInputFormat.setAvroReadSchema(job, avroSchema);
+
+      conf.set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, avroSchema.toString());
+      importJobConfigurator.configureAvroSchema(job, avroSchema);
+      importJobConfigurator.configureOutputCodec(job);
+      job.setOutputFormatClass(importJobConfigurator.getOutputFormatClass());
+    } catch (Exception cnfe) {
+      throw new IOException(cnfe);
+    }
+  }
+
+  private Schema loadAvroSchema(Configuration conf, Path path) throws IOException {
+    Schema avroSchema = getAvroSchemaFromParquetFile(path, conf);
+
+    if (avroSchema == null) {
+      throw new RuntimeException("Could not load Avro schema from path: " + path);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Avro schema loaded: " + avroSchema);
+    }
+
+    return avroSchema;
+  }
+
+  /**
+   * This method ensures that the Avro schema in the new path is compatible with the Avro schema in the old path.
+   */
+  private void validateNewPathAvroSchema(Schema newPathAvroSchema, Schema avroSchema) {
+    // If the new path is an empty directory (e.g. in case of a sqoop merge command) then the newPathAvroSchema will
+    // be null. In that case we just want to proceed without real validation.
+    if (newPathAvroSchema == null) {
+      return;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(format("Validation Avro schema %s against %s", newPathAvroSchema.toString(), avroSchema.toString()));
+    }
+    SchemaValidator schemaValidator = new SchemaValidatorBuilder().mutualReadStrategy().validateAll();
+    try {
+      schemaValidator.validate(newPathAvroSchema, singleton(avroSchema));
+    } catch (SchemaValidationException e) {
+      throw new RuntimeException("Cannot merge files, the Avro schemas are not compatible.", e);
+    }
+  }
+
+}
index 7f21205..02816d7 100644 (file)
@@ -24,6 +24,9 @@ import org.apache.sqoop.mapreduce.MergeParquetReducer;
 
 import java.io.IOException;
 
+/**
+ * An implementation of {@link MergeParquetReducer} which depends on the Kite Dataset API.
+ */
 public class KiteMergeParquetReducer extends MergeParquetReducer<GenericRecord, NullWritable> {
 
   @Override
index ca02c7b..6ebc5a3 100644 (file)
@@ -28,6 +28,9 @@ import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
 
 import java.io.IOException;
 
+/**
+ * An implementation of {@link ParquetExportJobConfigurator} which depends on the Kite Dataset API.
+ */
 public class KiteParquetExportJobConfigurator implements ParquetExportJobConfigurator {
 
   @Override
index 25555d8..122ff3f 100644 (file)
@@ -25,7 +25,7 @@ import org.apache.sqoop.mapreduce.GenericRecordExportMapper;
 import java.io.IOException;
 
 /**
- * Exports Parquet records from a data source.
+ * An implementation of {@link GenericRecordExportMapper} which depends on the Kite Dataset API.
  */
 public class KiteParquetExportMapper extends GenericRecordExportMapper<GenericRecord, NullWritable> {
 
index 87828d1..feb3bf1 100644 (file)
@@ -35,6 +35,9 @@ import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
 
 import java.io.IOException;
 
+/**
+ * An implementation of {@link ParquetImportJobConfigurator} which depends on the Kite Dataset API.
+ */
 public class KiteParquetImportJobConfigurator implements ParquetImportJobConfigurator {
 
   public static final Log LOG = LogFactory.getLog(KiteParquetImportJobConfigurator.class.getName());
index 20adf6e..0a91e4a 100644 (file)
@@ -30,6 +30,9 @@ import java.io.IOException;
 
 import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
 
+/**
+ * An implementation of {@link ParquetImportMapper} which depends on the Kite Dataset API.
+ */
 public class KiteParquetImportMapper extends ParquetImportMapper<GenericRecord, Void> {
 
   @Override
index 055e116..bd07c09 100644 (file)
@@ -23,6 +23,9 @@ import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
 import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
 import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
 
+/**
+ * A concrete factory implementation which produces configurator objects using the Kite Dataset API.
+ */
 public class KiteParquetJobConfiguratorFactory implements ParquetJobConfiguratorFactory {
 
   @Override
index 9fecf28..ed045cd 100644 (file)
@@ -48,6 +48,9 @@ import java.util.List;
 
 import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
 
+/**
+ * An implementation of {@link ParquetMergeJobConfigurator} which depends on the Kite Dataset API.
+ */
 public class KiteParquetMergeJobConfigurator implements ParquetMergeJobConfigurator {
 
   public static final Log LOG = LogFactory.getLog(KiteParquetMergeJobConfigurator.class.getName());
index e68bba9..a4768c9 100644 (file)
@@ -45,7 +45,7 @@ import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_
 import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_OUTPUT_CODEC_KEY;
 
 /**
- * Helper class for setting up a Parquet MapReduce job.
+ * Helper class using the Kite Dataset API for setting up a Parquet MapReduce job.
  */
 public final class KiteParquetUtils {
 
index c62ee98..8d31832 100644 (file)
@@ -19,6 +19,9 @@
 package org.apache.sqoop.tool;
 
 import static java.lang.String.format;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.PARQUET_JOB_CONFIGURATOR_IMPLEMENTATION_KEY;
+import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.valueOf;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -34,12 +37,11 @@ import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.OptionGroup;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.sqoop.manager.SupportedManagers;
 import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
 import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
-import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactoryProvider;
+import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation;
 import org.apache.sqoop.util.CredentialsUtil;
 import org.apache.sqoop.util.LoggingUtils;
 import org.apache.sqoop.util.password.CredentialProviderHelper;
@@ -183,6 +185,7 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool {
   public static final String THROW_ON_ERROR_ARG = "throw-on-error";
   public static final String ORACLE_ESCAPING_DISABLED = "oracle-escaping-disabled";
   public static final String ESCAPE_MAPPING_COLUMN_NAMES_ENABLED = "escape-mapping-column-names";
+  public static final String PARQUET_CONFIGURATOR_IMPLEMENTATION = "parquet-configurator-implementation";
 
   // Arguments for validation.
   public static final String VALIDATE_ARG = "validate";
@@ -1145,6 +1148,8 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool {
       out.setEscapeMappingColumnNamesEnabled(Boolean.parseBoolean(in.getOptionValue(
           ESCAPE_MAPPING_COLUMN_NAMES_ENABLED)));
     }
+
+    applyParquetJobConfigurationImplementation(in, out);
   }
 
   private void applyCredentialsOptions(CommandLine in, SqoopOptions out)
@@ -1908,7 +1913,27 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool {
 
   }
 
-  public ParquetJobConfiguratorFactory getParquetJobConfigurator(Configuration configuration) {
-    return ParquetJobConfiguratorFactoryProvider.createParquetJobConfiguratorFactory(configuration);
+  private void applyParquetJobConfigurationImplementation(CommandLine in, SqoopOptions out) throws InvalidOptionsException {
+    String optionValue = in.getOptionValue(PARQUET_CONFIGURATOR_IMPLEMENTATION);
+    String propertyValue = out.getConf().get(PARQUET_JOB_CONFIGURATOR_IMPLEMENTATION_KEY);
+
+    String valueToUse = isBlank(optionValue) ? propertyValue : optionValue;
+
+    if (isBlank(valueToUse)) {
+      LOG.debug("Parquet job configurator implementation is not set, using default value: " + out.getParquetConfiguratorImplementation());
+      return;
+    }
+
+    try {
+      ParquetJobConfiguratorImplementation parquetConfiguratorImplementation = valueOf(valueToUse.toUpperCase());
+      out.setParquetConfiguratorImplementation(parquetConfiguratorImplementation);
+      LOG.debug("Parquet job configurator implementation set: " + parquetConfiguratorImplementation);
+    } catch (IllegalArgumentException e) {
+      throw new InvalidOptionsException(format("Invalid Parquet job configurator implementation is set: %s. Supported values are: %s", valueToUse, Arrays.toString(ParquetJobConfiguratorImplementation.values())));
+    }
+  }
+
+  public ParquetJobConfiguratorFactory getParquetJobConfigurator(SqoopOptions sqoopOptions) {
+    return sqoopOptions.getParquetConfiguratorImplementation().createFactory();
   }
 }
index 2c474b7..25c3f70 100644 (file)
@@ -473,7 +473,7 @@ public class ImportTool extends BaseSqoopTool {
           loadJars(options.getConf(), context.getJarFile(), context.getTableName());
         }
 
-        ParquetMergeJobConfigurator parquetMergeJobConfigurator = getParquetJobConfigurator(options.getConf()).createParquetMergeJobConfigurator();
+        ParquetMergeJobConfigurator parquetMergeJobConfigurator = getParquetJobConfigurator(options).createParquetMergeJobConfigurator();
         MergeJob mergeJob = new MergeJob(options, parquetMergeJobConfigurator);
         if (mergeJob.runMergeJob()) {
           // Rename destination directory to proper location.
index 4c20f7d..e23b4f1 100644 (file)
@@ -53,7 +53,7 @@ public class MergeTool extends BaseSqoopTool {
   public int run(SqoopOptions options) {
     try {
       // Configure and execute a MapReduce job to merge these datasets.
-      ParquetMergeJobConfigurator parquetMergeJobConfigurator = getParquetJobConfigurator(options.getConf()).createParquetMergeJobConfigurator();
+      ParquetMergeJobConfigurator parquetMergeJobConfigurator = getParquetJobConfigurator(options).createParquetMergeJobConfigurator();
       MergeJob mergeJob = new MergeJob(options, parquetMergeJobConfigurator);
       if (!mergeJob.runMergeJob()) {
         LOG.error("MapReduce job failed!");
index ccea173..b579d93 100644 (file)
@@ -37,6 +37,7 @@ import org.apache.sqoop.testutil.CommonArgs;
 import org.apache.sqoop.testutil.ExportJobTestCase;
 import org.junit.Test;
 
+import static java.util.Collections.emptyList;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -57,7 +58,7 @@ public class TestBigDecimalExport extends ExportJobTestCase {
     writer.close();
     String[] types =
       { "DECIMAL", "NUMERIC" };
-    createTableWithColTypes(types, null);
+    createTableWithColTypes(types, emptyList());
 
     List<String> args = new ArrayList<String>();
 
index 11806fe..2b3280a 100644 (file)
@@ -26,6 +26,8 @@ import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.List;
+
+import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation;
 import org.apache.sqoop.testutil.CommonArgs;
 import org.apache.sqoop.testutil.HsqldbTestServer;
 import org.apache.sqoop.manager.ConnManager;
@@ -52,6 +54,8 @@ import org.apache.sqoop.util.ParquetReader;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.HADOOP;
+import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
 import static org.junit.Assert.fail;
 
 /**
@@ -80,6 +84,8 @@ public class TestMerge extends BaseSqoopTestCase {
       Arrays.asList(new Integer(1), new Integer(43)),
       Arrays.asList(new Integer(3), new Integer(313)));
 
+  private ParquetJobConfiguratorImplementation parquetJobConfiguratorImplementation = KITE;
+
   @Before
   public void setUp() {
     super.setUp();
@@ -112,6 +118,7 @@ public class TestMerge extends BaseSqoopTestCase {
   public SqoopOptions getSqoopOptions(Configuration conf) {
     SqoopOptions options = new SqoopOptions(conf);
     options.setConnectString(HsqldbTestServer.getDbUrl());
+    options.setParquetConfiguratorImplementation(parquetJobConfiguratorImplementation);
 
     return options;
   }
@@ -157,7 +164,14 @@ public class TestMerge extends BaseSqoopTestCase {
   }
 
   @Test
-  public void testParquetFileMerge() throws Exception {
+  public void testParquetFileMergeHadoop() throws Exception {
+    parquetJobConfiguratorImplementation = HADOOP;
+    runMergeTest(SqoopOptions.FileLayout.ParquetFile);
+  }
+
+  @Test
+  public void testParquetFileMergeKite() throws Exception {
+    parquetJobConfiguratorImplementation = KITE;
     runMergeTest(SqoopOptions.FileLayout.ParquetFile);
   }
 
index 43dabb5..0fab188 100644 (file)
@@ -32,6 +32,8 @@ import org.junit.Rule;
 
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import parquet.avro.AvroParquetWriter;
 
 import java.io.IOException;
@@ -42,6 +44,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 
@@ -55,11 +58,23 @@ import static parquet.hadoop.metadata.CompressionCodecName.SNAPPY;
 /**
  * Test that we can export Parquet Data Files from HDFS into databases.
  */
+@RunWith(Parameterized.class)
 public class TestParquetExport extends ExportJobTestCase {
 
+  @Parameterized.Parameters(name = "parquetImplementation = {0}")
+  public static Iterable<? extends Object> parquetImplementationParameters() {
+    return Arrays.asList("kite", "hadoop");
+  }
+
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
+  private final String parquetImplementation;
+
+  public TestParquetExport(String parquetImplementation) {
+    this.parquetImplementation = parquetImplementation;
+  }
+
   /**
    * @return an argv for the CodeGenTool to use when creating tables to export.
    */
@@ -478,5 +493,10 @@ public class TestParquetExport extends ExportJobTestCase {
     runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
   }
 
-
+  @Override
+  protected Configuration getConf() {
+    Configuration conf = super.getConf();
+    conf.set("parquetjob.configurator.implementation", parquetImplementation);
+    return conf;
+  }
 }
index 27d407a..b1488e8 100644 (file)
@@ -19,7 +19,6 @@
 package org.apache.sqoop;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.sqoop.testutil.CommonArgs;
 import org.apache.sqoop.testutil.HsqldbTestServer;
 import org.apache.sqoop.testutil.ImportJobTestCase;
@@ -32,12 +31,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sqoop.util.ParquetReader;
 import org.junit.Test;
-import parquet.avro.AvroSchemaConverter;
-import parquet.format.CompressionCodec;
-import parquet.hadoop.Footer;
-import parquet.hadoop.ParquetFileReader;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.schema.MessageType;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -46,20 +42,38 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.sqoop.avro.AvroUtil.getAvroSchemaFromParquetFile;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 
 /**
  * Tests --as-parquetfile.
  */
+@RunWith(Parameterized.class)
 public class TestParquetImport extends ImportJobTestCase {
 
   public static final Log LOG = LogFactory
       .getLog(TestParquetImport.class.getName());
 
+  private static String PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE = "kite";
+
+  private static String PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP = "hadoop";
+
+  @Parameters(name = "parquetImplementation = {0}")
+  public static Iterable<? extends Object> parquetImplementationParameters() {
+    return Arrays.asList(PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE, PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP);
+  }
+
+  private final String parquetImplementation;
+
+  public TestParquetImport(String parquetImplementation) {
+    this.parquetImplementation = parquetImplementation;
+  }
+
   /**
    * Create the argv to pass to Sqoop.
    *
@@ -122,12 +136,30 @@ public class TestParquetImport extends ImportJobTestCase {
   }
 
   @Test
-  public void testDeflateCompression() throws IOException {
+  public void testHadoopGzipCompression() throws IOException {
+    assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP.equals(parquetImplementation));
+    runParquetImportTest("gzip");
+  }
+
+  @Test
+  public void testKiteDeflateCompression() throws IOException {
+    assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE.equals(parquetImplementation));
     // The current Kite-based Parquet writing implementation uses GZIP compression codec when Deflate is specified.
     // See: org.kitesdk.data.spi.filesystem.ParquetAppender.getCompressionCodecName()
     runParquetImportTest("deflate", "gzip");
   }
 
+  /**
+   * This test case is added to document that the deflate codec is not supported with
+   * the Hadoop Parquet implementation so Sqoop throws an exception when it is specified.
+   * @throws IOException
+   */
+  @Test(expected = IOException.class)
+  public void testHadoopDeflateCompression() throws IOException {
+    assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP.equals(parquetImplementation));
+    runParquetImportTest("deflate");
+  }
+
   private void runParquetImportTest(String codec) throws IOException {
     runParquetImportTest(codec, codec);
   }
@@ -141,9 +173,10 @@ public class TestParquetImport extends ImportJobTestCase {
     String [] extraArgs = { "--compression-codec", codec};
     runImport(getOutputArgv(true, extraArgs));
 
-    assertEquals(expectedCodec.toUpperCase(), getCompressionType());
+    ParquetReader parquetReader = new ParquetReader(getTablePath());
+    assertEquals(expectedCodec.toUpperCase(), parquetReader.getCodec().name());
 
-    Schema schema = getSchema();
+    Schema schema = getAvroSchemaFromParquetFile(getTablePath(), getConf());
     assertEquals(Type.RECORD, schema.getType());
     List<Field> fields = schema.getFields();
     assertEquals(types.length, fields.size());
@@ -155,7 +188,7 @@ public class TestParquetImport extends ImportJobTestCase {
     checkField(fields.get(5), "DATA_COL5", Type.STRING);
     checkField(fields.get(6), "DATA_COL6", Type.BYTES);
 
-    List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
+    List<GenericRecord> genericRecords = parquetReader.readAll();
     GenericRecord record1 = genericRecords.get(0);
     assertNotNull(record1);
     assertEquals("DATA_COL0", true, record1.get("DATA_COL0"));
@@ -181,7 +214,7 @@ public class TestParquetImport extends ImportJobTestCase {
     String [] extraArgs = { "--map-column-java", "DATA_COL0=String"};
     runImport(getOutputArgv(true, extraArgs));
 
-    Schema schema = getSchema();
+    Schema schema = getAvroSchemaFromParquetFile(getTablePath(), getConf());
     assertEquals(Type.RECORD, schema.getType());
     List<Field> fields = schema.getFields();
     assertEquals(types.length, fields.size());
@@ -202,7 +235,7 @@ public class TestParquetImport extends ImportJobTestCase {
 
     runImport(getOutputArgv(true, null));
 
-    Schema schema = getSchema();
+    Schema schema = getAvroSchemaFromParquetFile(getTablePath(), getConf());
     assertEquals(Type.RECORD, schema.getType());
     List<Field> fields = schema.getFields();
     assertEquals(types.length, fields.size());
@@ -223,7 +256,7 @@ public class TestParquetImport extends ImportJobTestCase {
 
     runImport(getOutputArgv(true, null));
 
-    Schema schema = getSchema();
+    Schema schema = getAvroSchemaFromParquetFile(getTablePath(), getConf());
     assertEquals(Type.RECORD, schema.getType());
     List<Field> fields = schema.getFields();
     assertEquals(types.length, fields.size());
@@ -295,29 +328,6 @@ public class TestParquetImport extends ImportJobTestCase {
     }
   }
 
-  private String getCompressionType() {
-    ParquetMetadata parquetMetadata = getOutputMetadata();
-    CompressionCodec parquetCompressionCodec = parquetMetadata.getBlocks().get(0).getColumns().get(0).getCodec().getParquetCompressionCodec();
-    return parquetCompressionCodec.name();
-  }
-
-  private ParquetMetadata getOutputMetadata() {
-    try {
-      Configuration config = new Configuration();
-      FileStatus fileStatus = getTablePath().getFileSystem(config).getFileStatus(getTablePath());
-      List<Footer> footers = ParquetFileReader.readFooters(config, fileStatus, false);
-      return footers.get(0).getParquetMetadata();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private Schema getSchema() {
-    MessageType parquetSchema = getOutputMetadata().getFileMetaData().getSchema();
-    AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
-    return avroSchemaConverter.convert(parquetSchema);
-  }
-
   private void checkField(Field field, String name, Type type) {
     assertEquals(name, field.name());
     assertEquals(Type.UNION, field.schema().getType());
@@ -325,4 +335,10 @@ public class TestParquetImport extends ImportJobTestCase {
     assertEquals(type, field.schema().getTypes().get(1).getType());
   }
 
+  @Override
+  protected Configuration getConf() {
+    Configuration conf = super.getConf();
+    conf.set("parquetjob.configurator.implementation", parquetImplementation);
+    return conf;
+  }
 }
diff --git a/src/test/org/apache/sqoop/TestParquetIncrementalImportMerge.java b/src/test/org/apache/sqoop/TestParquetIncrementalImportMerge.java
new file mode 100644 (file)
index 0000000..d8d3af4
--- /dev/null
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop;
+
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.apache.sqoop.util.ParquetReader;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import parquet.hadoop.metadata.CompressionCodecName;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static parquet.hadoop.metadata.CompressionCodecName.GZIP;
+
+public class TestParquetIncrementalImportMerge extends ImportJobTestCase {
+
+  private static final String[] TEST_COLUMN_TYPES = {"INTEGER", "VARCHAR(32)", "CHAR(64)", "TIMESTAMP"};
+
+  private static final String[] ALTERNATIVE_TEST_COLUMN_TYPES = {"INTEGER", "INTEGER", "INTEGER", "TIMESTAMP"};
+
+  private static final String INITIAL_RECORDS_TIMESTAMP = "2018-06-14 15:00:00.000";
+
+  private static final String NEW_RECORDS_TIMESTAMP = "2018-06-14 16:00:00.000";
+
+  private static final List<List<Object>> INITIAL_RECORDS = Arrays.<List<Object>>asList(
+      Arrays.<Object>asList(2006, "Germany", "Italy", INITIAL_RECORDS_TIMESTAMP),
+      Arrays.<Object>asList(2014, "Brazil", "Hungary", INITIAL_RECORDS_TIMESTAMP)
+  );
+
+  private static final List<Object> ALTERNATIVE_INITIAL_RECORD = Arrays.<Object>asList(1, 2, 3, INITIAL_RECORDS_TIMESTAMP);
+
+  private static final List<List<Object>> NEW_RECORDS = Arrays.<List<Object>>asList(
+      Arrays.<Object>asList(2010, "South Africa", "Spain", NEW_RECORDS_TIMESTAMP),
+      Arrays.<Object>asList(2014, "Brazil", "Germany", NEW_RECORDS_TIMESTAMP)
+  );
+
+  private static final List<String> EXPECTED_MERGED_RECORDS = asList(
+      "2006,Germany,Italy," + timeFromString(INITIAL_RECORDS_TIMESTAMP),
+      "2010,South Africa,Spain," + timeFromString(NEW_RECORDS_TIMESTAMP),
+      "2014,Brazil,Germany," + timeFromString(NEW_RECORDS_TIMESTAMP)
+  );
+
+  private static final List<String> EXPECTED_INITIAL_RECORDS = asList(
+      "2006,Germany,Italy," + timeFromString(INITIAL_RECORDS_TIMESTAMP),
+      "2014,Brazil,Hungary," + timeFromString(INITIAL_RECORDS_TIMESTAMP)
+  );
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Override
+  public void setUp() {
+    super.setUp();
+
+    createTableWithRecords(TEST_COLUMN_TYPES, INITIAL_RECORDS);
+  }
+
+  @Test
+  public void testSimpleMerge() throws Exception {
+    String[] args = initialImportArgs(getConnectString(), getTableName(), getTablePath().toString()).build();
+    runImport(args);
+
+    clearTable(getTableName());
+
+    insertRecordsIntoTable(TEST_COLUMN_TYPES, NEW_RECORDS);
+    args = incrementalImportArgs(getConnectString(), getTableName(), getTablePath().toString(), getColName(3), getColName(0), INITIAL_RECORDS_TIMESTAMP).build();
+    runImport(args);
+
+    List<String> result = new ParquetReader(getTablePath()).readAllInCsvSorted();
+
+    assertEquals(EXPECTED_MERGED_RECORDS, result);
+  }
+
+  @Test
+  public void testMergeWhenTheIncrementalImportDoesNotImportAnyRows() throws Exception {
+    String[] args = initialImportArgs(getConnectString(), getTableName(), getTablePath().toString()).build();
+    runImport(args);
+
+    clearTable(getTableName());
+
+    args = incrementalImportArgs(getConnectString(), getTableName(), getTablePath().toString(), getColName(3), getColName(0), INITIAL_RECORDS_TIMESTAMP).build();
+    runImport(args);
+
+    List<String> result = new ParquetReader(getTablePath()).readAllInCsvSorted();
+
+    assertEquals(EXPECTED_INITIAL_RECORDS, result);
+  }
+
+  @Test
+  public void testMergeWithIncompatibleSchemas() throws Exception {
+    String targetDir = getWarehouseDir() + "/testMergeWithIncompatibleSchemas";
+    String[] args = initialImportArgs(getConnectString(), getTableName(), targetDir).build();
+    runImport(args);
+
+    incrementTableNum();
+    createTableWithColTypes(ALTERNATIVE_TEST_COLUMN_TYPES, ALTERNATIVE_INITIAL_RECORD);
+
+    args = incrementalImportArgs(getConnectString(), getTableName(), targetDir, getColName(3), getColName(0), INITIAL_RECORDS_TIMESTAMP).build();
+
+    expectedException.expectMessage("Cannot merge files, the Avro schemas are not compatible.");
+    runImportThrowingException(args);
+  }
+
+  @Test
+  public void testMergedFilesHaveCorrectCodec() throws Exception {
+    String[] args = initialImportArgs(getConnectString(), getTableName(), getTablePath().toString())
+        .withOption("compression-codec", "snappy")
+        .build();
+    runImport(args);
+
+    args = incrementalImportArgs(getConnectString(), getTableName(), getTablePath().toString(), getColName(3), getColName(0), INITIAL_RECORDS_TIMESTAMP)
+        .withOption("compression-codec", "gzip")
+        .build();
+    runImport(args);
+
+    CompressionCodecName compressionCodec = new ParquetReader(getTablePath()).getCodec();
+    assertEquals(GZIP, compressionCodec);
+  }
+
+  private ArgumentArrayBuilder initialImportArgs(String connectString, String tableName, String targetDir) {
+    return new ArgumentArrayBuilder()
+        .withProperty("parquetjob.configurator.implementation", "hadoop")
+        .withOption("connect", connectString)
+        .withOption("table", tableName)
+        .withOption("num-mappers", "1")
+        .withOption("target-dir", targetDir)
+        .withOption("as-parquetfile");
+  }
+
+  private ArgumentArrayBuilder incrementalImportArgs(String connectString, String tableName, String targetDir, String checkColumn, String mergeKey, String lastValue) {
+    return initialImportArgs(connectString, tableName, targetDir)
+        .withOption("incremental", "lastmodified")
+        .withOption("check-column", checkColumn)
+        .withOption("merge-key", mergeKey)
+        .withOption("last-value", lastValue);
+  }
+
+  private static long timeFromString(String timeStampString) {
+    try {
+      SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+      return format.parse(timeStampString).getTime();
+    } catch (ParseException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
index bb7c20d..ba4a4d4 100644 (file)
@@ -90,6 +90,7 @@ public class TestSqoopOptions {
     excludedFieldsFromClone.add("layout");
     excludedFieldsFromClone.add("activeSqoopTool");
     excludedFieldsFromClone.add("hbaseNullIncrementalMode");
+    excludedFieldsFromClone.add("parquetConfiguratorImplementation");
   }
 
   @After
index f6d591b..3d115ab 100644 (file)
@@ -18,7 +18,6 @@
 
 package org.apache.sqoop.hive;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.sqoop.hive.minicluster.HiveMiniCluster;
 import org.apache.sqoop.hive.minicluster.KerberosAuthenticationConfiguration;
@@ -67,7 +66,7 @@ public class TestHiveServer2TextImport extends ImportJobTestCase {
     List<Object> columnValues = Arrays.<Object>asList("test", 42, "somestring");
 
     String[] types = {"VARCHAR(32)", "INTEGER", "CHAR(64)"};
-    createTableWithColTypes(types, toStringArray(columnValues));
+    createTableWithColTypes(types, columnValues);
 
     String[] args = new ArgumentArrayBuilder()
         .withProperty(YarnConfiguration.RM_PRINCIPAL, miniKdcInfrastructure.getTestPrincipal())
@@ -83,19 +82,4 @@ public class TestHiveServer2TextImport extends ImportJobTestCase {
     List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
     assertEquals(columnValues, rows.get(0));
   }
-
-  private String[] toStringArray(List<Object> columnValues) {
-    String[] result = new String[columnValues.size()];
-
-    for (int i = 0; i < columnValues.size(); i++) {
-      if (columnValues.get(i) instanceof String) {
-        result[i] = StringUtils.wrap((String) columnValues.get(i), '\'');
-      } else {
-        result[i] = columnValues.get(i).toString();
-      }
-    }
-
-    return result;
-  }
-
 }
index a5f85a0..ac6db0b 100644 (file)
@@ -42,7 +42,9 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
+import java.util.List;
 
+import static org.apache.commons.lang3.StringUtils.wrap;
 import static org.junit.Assert.fail;
 
 /**
@@ -427,6 +429,16 @@ public abstract class BaseSqoopTestCase {
     insertIntoTable(null, colTypes, vals);
   }
 
+  protected void insertIntoTable(String[] colTypes, List<Object> record) {
+    insertIntoTable(null, colTypes, toStringArray(record));
+  }
+
+  protected void insertRecordsIntoTable(String[] colTypes, List<List<Object>> records) {
+    for (List<Object> record : records) {
+      insertIntoTable(colTypes, record);
+    }
+  }
+
   protected void insertIntoTable(String[] columns, String[] colTypes, String[] vals) {
     assert colTypes != null;
     assert colTypes.length == vals.length;
@@ -575,6 +587,17 @@ public abstract class BaseSqoopTestCase {
     createTableWithColTypesAndNames(colNames, colTypes, vals);
   }
 
+  protected void createTableWithColTypes(String [] colTypes, List<Object> record) {
+    createTableWithColTypes(colTypes, toStringArray(record));
+  }
+
+  protected void createTableWithRecords(String [] colTypes, List<List<Object>> records) {
+    createTableWithColTypes(colTypes, records.get(0));
+    for (int i = 1; i < records.size(); i++) {
+      insertIntoTable(colTypes, records.get(i));
+    }
+  }
+
   /**
    * Create a table with a single column and put a data element in it.
    * @param colType the type of the column to create
@@ -627,4 +650,28 @@ public abstract class BaseSqoopTestCase {
 
     return ObjectArrays.concat(entries, moreEntries, String.class);
   }
+
+  protected void clearTable(String tableName) throws SQLException {
+    String truncateCommand = "DELETE FROM " + tableName;
+    Connection conn = getManager().getConnection();
+    try (PreparedStatement statement = conn.prepareStatement(truncateCommand)){
+      statement.executeUpdate();
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private String[] toStringArray(List<Object> columnValues) {
+    String[] result = new String[columnValues.size()];
+
+    for (int i = 0; i < columnValues.size(); i++) {
+      if (columnValues.get(i) instanceof String) {
+        result[i] = wrap((String) columnValues.get(i), '\'');
+      } else {
+        result[i] = columnValues.get(i).toString();
+      }
+    }
+
+    return result;
+  }
 }
index dbefe20..7b22c86 100644 (file)
@@ -37,6 +37,7 @@ import org.apache.sqoop.tool.ImportTool;
 import org.apache.sqoop.util.ClassLoaderStack;
 import org.junit.Before;
 
+import static org.apache.sqoop.Sqoop.SQOOP_RETHROW_PROPERTY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -219,12 +220,7 @@ public abstract class ImportJobTestCase extends BaseSqoopTestCase {
     // run the tool through the normal entry-point.
     int ret;
     try {
-      Configuration conf = getConf();
-      //Need to disable OraOop for existing tests
-      conf.set("oraoop.disabled", "true");
-      SqoopOptions opts = getSqoopOptions(conf);
-      Sqoop sqoop = new Sqoop(tool, conf, opts);
-      ret = Sqoop.runSqoop(sqoop, argv);
+      ret = runSqoopTool(tool, argv, getSqoopOptions(getConf()));
     } catch (Exception e) {
       LOG.error("Got exception running Sqoop: " + e.toString());
       e.printStackTrace();
@@ -237,9 +233,40 @@ public abstract class ImportJobTestCase extends BaseSqoopTestCase {
     }
   }
 
+  private int runSqoopTool(SqoopTool tool, String [] argv, SqoopOptions sqoopOptions) {
+    Configuration conf = getConf();
+    //Need to disable OraOop for existing tests
+    conf.set("oraoop.disabled", "true");
+    Sqoop sqoop = new Sqoop(tool, conf, sqoopOptions);
+
+    return Sqoop.runSqoop(sqoop, argv);
+  }
+
+  protected int runImportThrowingException(SqoopTool tool, String [] argv) {
+    String oldRethrowProperty = System.getProperty(SQOOP_RETHROW_PROPERTY);
+    System.setProperty(SQOOP_RETHROW_PROPERTY, "true");
+
+    SqoopOptions sqoopOptions = getSqoopOptions(getConf());
+    sqoopOptions.setThrowOnError(true);
+
+    try {
+      return runSqoopTool(tool, argv, sqoopOptions);
+    } finally {
+      if (oldRethrowProperty == null) {
+        System.clearProperty(SQOOP_RETHROW_PROPERTY);
+      } else {
+        System.setProperty(SQOOP_RETHROW_PROPERTY, oldRethrowProperty);
+      }
+    }
+  }
+
   /** run an import using the default ImportTool. */
   protected void runImport(String [] argv) throws IOException {
     runImport(new ImportTool(), argv);
   }
 
+  protected void runImportThrowingException(String [] argv) {
+    runImportThrowingException(new ImportTool(), argv);
+  }
+
 }
index 01ad150..dbda8b7 100644 (file)
 
 package org.apache.sqoop.tool;
 
+import org.apache.commons.cli.CommandLine;
 import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.mockito.Mockito;
 
+import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.HADOOP;
+import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
 import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestBaseSqoopTool {
 
@@ -35,11 +41,13 @@ public class TestBaseSqoopTool {
 
   private BaseSqoopTool testBaseSqoopTool;
   private SqoopOptions testSqoopOptions;
+  private CommandLine mockCommandLine;
 
   @Before
   public void setup() {
     testBaseSqoopTool = mock(BaseSqoopTool.class, Mockito.CALLS_REAL_METHODS);
     testSqoopOptions = new SqoopOptions();
+    mockCommandLine = mock(CommandLine.class);
   }
 
   @Test
@@ -69,4 +77,61 @@ public class TestBaseSqoopTool {
     testBaseSqoopTool.rethrowIfRequired(testSqoopOptions, expectedCauseException);
   }
 
+  @Test
+  public void testApplyCommonOptionsSetsParquetJobConfigurationImplementationFromCommandLine() throws Exception {
+    ParquetJobConfiguratorImplementation expectedValue = HADOOP;
+
+    when(mockCommandLine.getOptionValue("parquet-configurator-implementation")).thenReturn(expectedValue.toString());
+
+    testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions);
+
+    assertEquals(expectedValue, testSqoopOptions.getParquetConfiguratorImplementation());
+  }
+
+  @Test
+  public void testApplyCommonOptionsSetsParquetJobConfigurationImplementationFromCommandLineCaseInsensitively() throws Exception {
+    String hadoopImplementationLowercase = "haDooP";
+
+    when(mockCommandLine.getOptionValue("parquet-configurator-implementation")).thenReturn(hadoopImplementationLowercase);
+
+    testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions);
+
+    assertEquals(HADOOP, testSqoopOptions.getParquetConfiguratorImplementation());
+  }
+
+  @Test
+  public void testApplyCommonOptionsSetsParquetJobConfigurationImplementationFromConfiguration() throws Exception {
+    ParquetJobConfiguratorImplementation expectedValue = HADOOP;
+    testSqoopOptions.getConf().set("parquetjob.configurator.implementation", expectedValue.toString());
+
+    testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions);
+
+    assertEquals(expectedValue, testSqoopOptions.getParquetConfiguratorImplementation());
+  }
+
+  @Test
+  public void testApplyCommonOptionsPrefersParquetJobConfigurationImplementationFromCommandLine() throws Exception {
+    ParquetJobConfiguratorImplementation expectedValue = HADOOP;
+    testSqoopOptions.getConf().set("parquetjob.configurator.implementation", "kite");
+    when(mockCommandLine.getOptionValue("parquet-configurator-implementation")).thenReturn(expectedValue.toString());
+
+    testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions);
+
+    assertEquals(expectedValue, testSqoopOptions.getParquetConfiguratorImplementation());
+  }
+
+  @Test
+  public void testApplyCommonOptionsThrowsWhenInvalidParquetJobConfigurationImplementationIsSet() throws Exception {
+    when(mockCommandLine.getOptionValue("parquet-configurator-implementation")).thenReturn("this_is_definitely_not_valid");
+
+    exception.expectMessage("Invalid Parquet job configurator implementation is set: this_is_definitely_not_valid. Supported values are: [KITE, HADOOP]");
+    testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions);
+  }
+
+  @Test
+  public void testApplyCommonOptionsDoesNotChangeDefaultParquetJobConfigurationImplementationWhenNothingIsSet() throws Exception {
+    testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions);
+
+    assertEquals(KITE, testSqoopOptions.getParquetConfiguratorImplementation());
+  }
 }
index 56e03a0..f1c2fe1 100644 (file)
@@ -20,8 +20,16 @@ package org.apache.sqoop.util;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import parquet.avro.AvroParquetReader;
+import parquet.hadoop.Footer;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.hadoop.util.HiddenFileFilter;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -29,8 +37,10 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
+import java.util.Iterator;
 import java.util.List;
 
+import static java.util.Arrays.asList;
 import static org.apache.sqoop.util.FileSystemUtil.isFile;
 import static org.apache.sqoop.util.FileSystemUtil.listFiles;
 
@@ -95,6 +105,49 @@ public class ParquetReader implements AutoCloseable {
     return result;
   }
 
+  public List<String> readAllInCsvSorted() {
+    List<String> result = readAllInCsv();
+    Collections.sort(result);
+
+    return result;
+  }
+
+  public CompressionCodecName getCodec() {
+    List<Footer> footers = getFooters();
+
+    Iterator<Footer> footersIterator = footers.iterator();
+    if (footersIterator.hasNext()) {
+      Footer footer = footersIterator.next();
+
+      Iterator<BlockMetaData> blockMetaDataIterator = footer.getParquetMetadata().getBlocks().iterator();
+      if (blockMetaDataIterator.hasNext()) {
+        BlockMetaData blockMetaData = blockMetaDataIterator.next();
+
+        Iterator<ColumnChunkMetaData> columnChunkMetaDataIterator = blockMetaData.getColumns().iterator();
+
+        if (columnChunkMetaDataIterator.hasNext()) {
+          ColumnChunkMetaData columnChunkMetaData = columnChunkMetaDataIterator.next();
+
+          return columnChunkMetaData.getCodec();
+        }
+      }
+    }
+
+    return null;
+  }
+
+  private List<Footer> getFooters() {
+    final List<Footer> footers;
+    try {
+      FileSystem fs = pathToRead.getFileSystem(configuration);
+      List<FileStatus> statuses = asList(fs.listStatus(pathToRead, HiddenFileFilter.INSTANCE));
+      footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, statuses, false);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return footers;
+  }
+
   private String convertToCsv(GenericRecord record) {
     StringBuilder result = new StringBuilder();
     for (int i = 0; i < record.getSchema().getFields().size(); i++) {