SQOOP-3335: Add Hive support to the new Parquet writing implementation
authorBoglarka Egyed <bogi@apache.org>
Mon, 16 Jul 2018 11:41:12 +0000 (13:41 +0200)
committerBoglarka Egyed <bogi@apache.org>
Mon, 16 Jul 2018 11:41:12 +0000 (13:41 +0200)
(Szabolcs Vasas via Boglarka Egyed)

14 files changed:
src/java/org/apache/sqoop/hive/HiveTypes.java
src/java/org/apache/sqoop/hive/TableDefWriter.java
src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java
src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportJobConfigurator.java
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java
src/java/org/apache/sqoop/tool/BaseSqoopTool.java
src/java/org/apache/sqoop/tool/ImportTool.java
src/test/org/apache/sqoop/TestParquetIncrementalImportMerge.java
src/test/org/apache/sqoop/hive/TestHiveServer2ParquetImport.java [new file with mode: 0644]
src/test/org/apache/sqoop/hive/TestHiveServer2TextImport.java
src/test/org/apache/sqoop/hive/TestHiveTypesForAvroTypeMapping.java [new file with mode: 0644]
src/test/org/apache/sqoop/hive/TestTableDefWriter.java
src/test/org/apache/sqoop/testutil/BaseSqoopTestCase.java
src/test/org/apache/sqoop/tool/TestHiveServer2OptionValidations.java

index ad00535..554a036 100644 (file)
@@ -20,6 +20,7 @@ package org.apache.sqoop.hive;
 
 import java.sql.Types;
 
+import org.apache.avro.Schema;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -28,6 +29,15 @@ import org.apache.commons.logging.LogFactory;
  */
 public final class HiveTypes {
 
+  private static final String HIVE_TYPE_TINYINT = "TINYINT";
+  private static final String HIVE_TYPE_INT = "INT";
+  private static final String HIVE_TYPE_BIGINT = "BIGINT";
+  private static final String HIVE_TYPE_FLOAT = "FLOAT";
+  private static final String HIVE_TYPE_DOUBLE = "DOUBLE";
+  private static final String HIVE_TYPE_STRING = "STRING";
+  private static final String HIVE_TYPE_BOOLEAN = "BOOLEAN";
+  private static final String HIVE_TYPE_BINARY = "BINARY";
+
   public static final Log LOG = LogFactory.getLog(HiveTypes.class.getName());
 
   private HiveTypes() { }
@@ -41,7 +51,7 @@ public final class HiveTypes {
       switch (sqlType) {
           case Types.INTEGER:
           case Types.SMALLINT:
-              return "INT";
+              return HIVE_TYPE_INT;
           case Types.VARCHAR:
           case Types.CHAR:
           case Types.LONGVARCHAR:
@@ -52,20 +62,20 @@ public final class HiveTypes {
           case Types.TIME:
           case Types.TIMESTAMP:
           case Types.CLOB:
-              return "STRING";
+              return HIVE_TYPE_STRING;
           case Types.NUMERIC:
           case Types.DECIMAL:
           case Types.FLOAT:
           case Types.DOUBLE:
           case Types.REAL:
-              return "DOUBLE";
+              return HIVE_TYPE_DOUBLE;
           case Types.BIT:
           case Types.BOOLEAN:
-              return "BOOLEAN";
+              return HIVE_TYPE_BOOLEAN;
           case Types.TINYINT:
-              return "TINYINT";
+              return HIVE_TYPE_TINYINT;
           case Types.BIGINT:
-              return "BIGINT";
+              return HIVE_TYPE_BIGINT;
           default:
         // TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT,
         // BLOB, ARRAY, STRUCT, REF, JAVA_OBJECT.
@@ -73,6 +83,29 @@ public final class HiveTypes {
       }
   }
 
+  public static String toHiveType(Schema.Type avroType) {
+      switch (avroType) {
+        case BOOLEAN:
+          return HIVE_TYPE_BOOLEAN;
+        case INT:
+          return HIVE_TYPE_INT;
+        case LONG:
+          return HIVE_TYPE_BIGINT;
+        case FLOAT:
+          return HIVE_TYPE_FLOAT;
+        case DOUBLE:
+          return HIVE_TYPE_DOUBLE;
+        case STRING:
+        case ENUM:
+          return HIVE_TYPE_STRING;
+        case BYTES:
+        case FIXED:
+          return HIVE_TYPE_BINARY;
+        default:
+          return null;
+      }
+  }
+
   /**
    * @return true if a sql type can't be translated to a precise match
    * in Hive, and we have to cast it to something more generic.
index 27d988c..b21dfe5 100644 (file)
@@ -20,24 +20,31 @@ package org.apache.sqoop.hive;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Date;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Properties;
 
+import org.apache.avro.Schema;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.avro.AvroUtil;
 import org.apache.sqoop.io.CodecMap;
 
 import org.apache.sqoop.SqoopOptions;
 import org.apache.sqoop.manager.ConnManager;
 import org.apache.sqoop.util.FileSystemUtil;
 
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
+
 /**
  * Creates (Hive-specific) SQL DDL statements to create tables to hold data
  * we're importing from another source.
@@ -56,6 +63,7 @@ public class TableDefWriter {
   private String inputTableName;
   private String outputTableName;
   private boolean commentsEnabled;
+  private Schema avroSchema;
 
   /**
    * Creates a new TableDefWriter to generate a Hive CREATE TABLE statement.
@@ -82,6 +90,9 @@ public class TableDefWriter {
    * Get the column names to import.
    */
   private String [] getColumnNames() {
+    if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) {
+      return getColumnNamesFromAvroSchema();
+    }
     String [] colNames = options.getColumns();
     if (null != colNames) {
       return colNames; // user-specified column names.
@@ -92,6 +103,16 @@ public class TableDefWriter {
     }
   }
 
+  private String[] getColumnNamesFromAvroSchema() {
+    List<String> result = new ArrayList<>();
+
+    for (Schema.Field field : getAvroSchema().getFields()) {
+      result.add(field.name());
+    }
+
+    return result.toArray(new String[result.size()]);
+  }
+
   /**
    * @return the CREATE TABLE statement for the table to load into hive.
    */
@@ -108,6 +129,7 @@ public class TableDefWriter {
     }
 
     String [] colNames = getColumnNames();
+    Map<String, Schema.Type> columnNameToAvroType = getColumnNameToAvroTypeMapping();
     StringBuilder sb = new StringBuilder();
     if (options.doFailIfHiveTableExists()) {
       if (isHiveExternalTableSet) {
@@ -158,22 +180,18 @@ public class TableDefWriter {
 
       first = false;
 
-      Integer colType = columnTypes.get(col);
-      String hiveColType = userMapping.getProperty(col);
-      if (hiveColType == null) {
-        hiveColType = connManager.toHiveType(inputTableName, col, colType);
-      }
-      if (null == hiveColType) {
-        throw new IOException("Hive does not support the SQL type for column "
-            + col);
+      String hiveColType;
+      if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
+        Integer colType = columnTypes.get(col);
+        hiveColType = getHiveColumnTypeForTextTable(userMapping, col, colType);
+      } else if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) {
+        hiveColType = HiveTypes.toHiveType(columnNameToAvroType.get(col));
+      } else {
+        throw new RuntimeException("File format is not supported for Hive tables.");
       }
 
       sb.append('`').append(col).append("` ").append(hiveColType);
 
-      if (HiveTypes.isHiveTypeImprovised(colType)) {
-        LOG.warn(
-            "Column " + col + " had to be cast to a less precise type in Hive");
-      }
     }
 
     sb.append(") ");
@@ -190,19 +208,23 @@ public class TableDefWriter {
         .append(" STRING) ");
      }
 
-    sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '");
-    sb.append(getHiveOctalCharCode((int) options.getOutputFieldDelim()));
-    sb.append("' LINES TERMINATED BY '");
-    sb.append(getHiveOctalCharCode((int) options.getOutputRecordDelim()));
-    String codec = options.getCompressionCodec();
-    if (codec != null && (codec.equals(CodecMap.LZOP)
-            || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
-      sb.append("' STORED AS INPUTFORMAT "
-              + "'com.hadoop.mapred.DeprecatedLzoTextInputFormat'");
-      sb.append(" OUTPUTFORMAT "
-              + "'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'");
+    if (SqoopOptions.FileLayout.ParquetFile.equals(options.getFileLayout())) {
+      sb.append("STORED AS PARQUET");
     } else {
-      sb.append("' STORED AS TEXTFILE");
+      sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '");
+      sb.append(getHiveOctalCharCode((int) options.getOutputFieldDelim()));
+      sb.append("' LINES TERMINATED BY '");
+      sb.append(getHiveOctalCharCode((int) options.getOutputRecordDelim()));
+      String codec = options.getCompressionCodec();
+      if (codec != null && (codec.equals(CodecMap.LZOP)
+          || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
+        sb.append("' STORED AS INPUTFORMAT "
+            + "'com.hadoop.mapred.DeprecatedLzoTextInputFormat'");
+        sb.append(" OUTPUTFORMAT "
+            + "'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'");
+      } else {
+        sb.append("' STORED AS TEXTFILE");
+      }
     }
 
     if (isHiveExternalTableSet) {
@@ -214,6 +236,50 @@ public class TableDefWriter {
     return sb.toString();
   }
 
+  private Map<String, Schema.Type> getColumnNameToAvroTypeMapping() {
+    if (options.getFileLayout() != SqoopOptions.FileLayout.ParquetFile) {
+      return Collections.emptyMap();
+    }
+    Map<String, Schema.Type> result = new HashMap<>();
+    Schema avroSchema = getAvroSchema();
+    for (Schema.Field field : avroSchema.getFields()) {
+      result.put(field.name(), getNonNullAvroType(field.schema()));
+    }
+
+    return result;
+  }
+
+  private Schema.Type getNonNullAvroType(Schema schema) {
+    if (schema.getType() != Schema.Type.UNION) {
+      return schema.getType();
+    }
+
+    for (Schema subSchema : schema.getTypes()) {
+      if (subSchema.getType() != Schema.Type.NULL) {
+        return subSchema.getType();
+      }
+    }
+
+    return null;
+  }
+
+  private String getHiveColumnTypeForTextTable(Properties userMapping, String columnName, Integer columnType) throws IOException {
+    String hiveColType = userMapping.getProperty(columnName);
+    if (hiveColType == null) {
+      hiveColType = connManager.toHiveType(inputTableName, columnName, columnType);
+    }
+    if (null == hiveColType) {
+      throw new IOException("Hive does not support the SQL type for column "
+          + columnName);
+    }
+
+    if (HiveTypes.isHiveTypeImprovised(columnType)) {
+      LOG.warn(
+          "Column " + columnName + " had to be cast to a less precise type in Hive");
+    }
+    return hiveColType;
+  }
+
   /**
    * @return the LOAD DATA statement to import the data in HDFS into hive.
    */
@@ -320,5 +386,14 @@ public class TableDefWriter {
   boolean isCommentsEnabled() {
     return commentsEnabled;
   }
+
+  Schema getAvroSchema() {
+    if (avroSchema == null) {
+      String schemaString = options.getConf().get(SQOOP_PARQUET_AVRO_SCHEMA_KEY);
+      avroSchema = AvroUtil.parseAvroSchema(schemaString);
+    }
+
+    return avroSchema;
+  }
 }
 
index 3f35faf..90b910a 100644 (file)
@@ -58,6 +58,11 @@ public class HadoopParquetImportJobConfigurator implements ParquetImportJobConfi
     return AvroParquetOutputFormat.class;
   }
 
+  @Override
+  public boolean isHiveImportNeeded() {
+    return true;
+  }
+
   void configureOutputCodec(Job job) {
     String outputCodec = job.getConfiguration().get(SQOOP_PARQUET_OUTPUT_CODEC_KEY);
     if (outputCodec != null) {
index feb3bf1..7e179a2 100644 (file)
@@ -79,6 +79,11 @@ public class KiteParquetImportJobConfigurator implements ParquetImportJobConfigu
     return DatasetKeyOutputFormat.class;
   }
 
+  @Override
+  public boolean isHiveImportNeeded() {
+    return false;
+  }
+
   private String getKiteUri(Configuration conf, SqoopOptions options, String tableName, Path destination) throws IOException {
     if (options.doHiveImport()) {
       String hiveDatabase = options.getHiveDatabaseName() == null ? "default" :
index e505c26..87fc5e9 100644 (file)
@@ -21,6 +21,7 @@ package org.apache.sqoop.tool;
 import static java.lang.String.format;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.PARQUET_JOB_CONFIGURATOR_IMPLEMENTATION_KEY;
+import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
 import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.valueOf;
 
 import java.io.File;
@@ -1586,12 +1587,13 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool {
         + "importing into SequenceFile format.");
     }
 
-    // Hive import and create hive table not compatible for ParquetFile format
+    // Hive import and create hive table not compatible for ParquetFile format when using Kite
     if (options.doHiveImport()
         && options.doFailIfHiveTableExists()
-        && options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) {
+        && options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile
+        && options.getParquetConfiguratorImplementation() == KITE) {
       throw new InvalidOptionsException("Hive import and create hive table is not compatible with "
-        + "importing into ParquetFile format.");
+        + "importing into ParquetFile format using Kite.");
       }
 
     if (options.doHiveImport()
@@ -1902,7 +1904,6 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool {
 
   protected void validateHS2Options(SqoopOptions options) throws SqoopOptions.InvalidOptionsException {
     final String withoutTemplate = "The %s option cannot be used without the %s option.";
-    final String withTemplate = "The %s option cannot be used with the %s option.";
 
     if (isSet(options.getHs2Url()) && !options.doHiveImport()) {
       throw new InvalidOptionsException(format(withoutTemplate, HS2_URL_ARG, HIVE_IMPORT_ARG));
@@ -1915,11 +1916,6 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool {
     if (isSet(options.getHs2Keytab()) && !isSet(options.getHs2User())) {
       throw  new InvalidOptionsException(format(withoutTemplate, HS2_KEYTAB_ARG, HS2_USER_ARG));
     }
-
-    if (isSet(options.getHs2Url()) && (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile)) {
-      throw  new InvalidOptionsException(format(withTemplate, HS2_URL_ARG, FMT_PARQUETFILE_ARG));
-    }
-
   }
 
   private void applyParquetJobConfigurationImplementation(CommandLine in, SqoopOptions out) throws InvalidOptionsException {
index 25c3f70..f7310b9 100644 (file)
@@ -46,6 +46,7 @@ import org.apache.sqoop.hive.HiveClient;
 import org.apache.sqoop.hive.HiveClientFactory;
 import org.apache.sqoop.manager.ImportJobContext;
 import org.apache.sqoop.mapreduce.MergeJob;
+import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
 import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
 import org.apache.sqoop.metastore.JobData;
 import org.apache.sqoop.metastore.JobStorage;
@@ -541,13 +542,9 @@ public class ImportTool extends BaseSqoopTool {
     }
 
     // If the user wants this table to be in Hive, perform that post-load.
-    if (options.doHiveImport()) {
-      // For Parquet file, the import action will create hive table directly via
-      // kite. So there is no need to do hive import as a post step again.
-      if (options.getFileLayout() != SqoopOptions.FileLayout.ParquetFile) {
-        HiveClient hiveClient = hiveClientFactory.createHiveClient(options, manager);
-        hiveClient.importTable();
-      }
+    if (isHiveImportNeeded(options)) {
+      HiveClient hiveClient = hiveClientFactory.createHiveClient(options, manager);
+      hiveClient.importTable();
     }
 
     saveIncrementalState(options);
@@ -1192,5 +1189,18 @@ public class ImportTool extends BaseSqoopTool {
     validateHCatalogOptions(options);
     validateAccumuloOptions(options);
   }
+
+  private boolean isHiveImportNeeded(SqoopOptions options) {
+    if (!options.doHiveImport()) {
+      return false;
+    }
+
+    if (options.getFileLayout() != SqoopOptions.FileLayout.ParquetFile) {
+      return true;
+    }
+
+    ParquetJobConfiguratorFactory parquetJobConfigurator = getParquetJobConfigurator(options);
+    return parquetJobConfigurator.createParquetImportJobConfigurator().isHiveImportNeeded();
+  }
 }
 
index d8d3af4..adad0cc 100644 (file)
@@ -26,8 +26,6 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import parquet.hadoop.metadata.CompressionCodecName;
 
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
 import java.util.Arrays;
 import java.util.List;
 
@@ -157,13 +155,4 @@ public class TestParquetIncrementalImportMerge extends ImportJobTestCase {
         .withOption("merge-key", mergeKey)
         .withOption("last-value", lastValue);
   }
-
-  private static long timeFromString(String timeStampString) {
-    try {
-      SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
-      return format.parse(timeStampString).getTime();
-    } catch (ParseException e) {
-      throw new RuntimeException(e);
-    }
-  }
 }
diff --git a/src/test/org/apache/sqoop/hive/TestHiveServer2ParquetImport.java b/src/test/org/apache/sqoop/hive/TestHiveServer2ParquetImport.java
new file mode 100644 (file)
index 0000000..b55179a
--- /dev/null
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive;
+
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.hive.minicluster.HiveMiniCluster;
+import org.apache.sqoop.hive.minicluster.NoAuthenticationConfiguration;
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
+import org.apache.sqoop.testutil.HiveServer2TestUtil;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.apache.sqoop.util.ParquetReader;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import parquet.hadoop.metadata.CompressionCodecName;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static java.util.Arrays.deepEquals;
+import static org.apache.sqoop.testutil.BaseSqoopTestCase.timeFromString;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Enclosed.class)
+public class TestHiveServer2ParquetImport {
+
+  private static final String[] TEST_COLUMN_NAMES = {"C1_VARCHAR", "C2#INTEGER", "3C_CHAR"};
+
+  private static final String[] TEST_COLUMN_TYPES = {"VARCHAR(32)", "INTEGER", "CHAR(64)"};
+
+  private static final String[] TEST_COLUMN_ALL_TYPES = {"INTEGER", "BIGINT", "DOUBLE", "DECIMAL(10, 2)", "BOOLEAN", "TIMESTAMP", "BINARY", "VARCHAR(100)", "CHAR(100)"};
+
+  private static final List<Object> TEST_COLUMN_ALL_TYPES_VALUES = Arrays.<Object>asList(10, 12345678910123L, 12.34, 456842.45, "TRUE", "2018-06-14 15:00:00.000", "abcdef", "testVarchar", "testChar");
+
+  private static final Object[] EXPECTED_TEST_COLUMN_ALL_TYPES_VALUES = {10, 12345678910123L, 12.34, "456842.45", true, timeFromString("2018-06-14 15:00:00.000"), decodeHex("abcdef"), "testVarchar", "testChar"};
+
+  private static final List<Object> TEST_COLUMN_VALUES = Arrays.<Object>asList("test", 42, "somestring");
+
+  private static final List<Object> TEST_COLUMN_VALUES_MAPPED = Arrays.<Object>asList("test", "42", "somestring");
+
+  private static final List<Object> TEST_COLUMN_VALUES_LINE2 = Arrays.<Object>asList("test2", 4242, "somestring2");
+
+  private static HiveMiniCluster hiveMiniCluster;
+
+  private static HiveServer2TestUtil hiveServer2TestUtil;
+
+  @RunWith(Parameterized.class)
+  public static class ParquetCompressionCodecTestCase extends ImportJobTestCase {
+
+    @Parameters(name = "compressionCodec = {0}")
+    public static Iterable<? extends Object> authenticationParameters() {
+      return Arrays.asList("snappy", "gzip");
+    }
+
+    @BeforeClass
+    public static void beforeClass() {
+      startHiveMiniCluster();
+    }
+
+    @AfterClass
+    public static void afterClass() {
+      stopHiveMiniCluster();
+    }
+
+    private final String compressionCodec;
+
+    public ParquetCompressionCodecTestCase(String compressionCodec) {
+      this.compressionCodec = compressionCodec;
+    }
+
+    @Override
+    @Before
+    public void setUp() {
+      super.setUp();
+
+      createTableWithColTypesAndNames(TEST_COLUMN_NAMES, TEST_COLUMN_TYPES, TEST_COLUMN_VALUES);
+    }
+
+    @Test
+    public void testHiveImportAsParquetWithCompressionCodecCanBeLoaded() throws Exception {
+      String[] args = commonArgs(getConnectString(), getTableName())
+          .withOption("compression-codec", compressionCodec)
+          .build();
+
+      runImport(args);
+
+      List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
+      assertThat(rows, hasItems(TEST_COLUMN_VALUES));
+    }
+
+    @Test
+    public void testImportedFilesHaveCorrectCodec() throws Exception {
+      Path tablePath = new Path(hiveMiniCluster.getTempFolderPath() + "/" + getTableName().toLowerCase());
+      String[] args = commonArgs(getConnectString(), getTableName())
+          .withOption("compression-codec", compressionCodec)
+          .build();
+
+      runImport(args);
+
+      CompressionCodecName codec = new ParquetReader(tablePath).getCodec();
+      assertEquals(compressionCodec, codec.name().toLowerCase());
+    }
+  }
+
+  public static class GeneralParquetTestCase extends ImportJobTestCase {
+
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @BeforeClass
+    public static void beforeClass() {
+      startHiveMiniCluster();
+    }
+
+    @AfterClass
+    public static void afterClass() {
+      stopHiveMiniCluster();
+    }
+
+    @Override
+    @Before
+    public void setUp() {
+      super.setUp();
+
+      createTableWithColTypesAndNames(TEST_COLUMN_NAMES, TEST_COLUMN_TYPES, TEST_COLUMN_VALUES);
+    }
+
+    @Test
+    public void testNormalHiveImportAsParquet() throws Exception {
+      String[] args = commonArgs(getConnectString(), getTableName()).build();
+
+      runImport(args);
+
+      List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
+      assertThat(rows, hasItems(TEST_COLUMN_VALUES));
+    }
+
+    @Test
+    public void testHiveImportAsParquetWithMapColumnJavaAndOriginalColumnNameSucceeds() throws Exception {
+      String[] args = commonArgs(getConnectString(), getTableName())
+          .withOption("map-column-java", "C2#INTEGER=String")
+          .build();
+
+      runImport(args);
+
+      List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
+      assertThat(rows, hasItems(TEST_COLUMN_VALUES_MAPPED));
+    }
+
+    /**
+     * This test case documents that the Avro identifier(C2_INTEGER)
+     * of a special column name(C2#INTEGER) cannot be used in map-column-java.
+     * The reason is that org.apache.sqoop.orm.AvroSchemaGenerator#toAvroType(java.lang.String, int)
+     * which maps the Avro schema type uses the original column name and
+     * not the Avro identifier but org.apache.sqoop.orm.ClassWriter#toJavaType(java.lang.String, int)
+     * can map the DAO class field types based on the Avro identifier too so there will be a discrepancy
+     * between the generated Avro schema types and the DAO class field types.
+     */
+    @Test
+    public void testHiveImportAsParquetWithMapColumnJavaAndAvroIdentifierFails() throws Exception {
+      String[] args = commonArgs(getConnectString(), getTableName())
+          .withOption("map-column-java", "C2_INTEGER=String")
+          .build();
+
+      expectedException.expect(IOException.class);
+      runImport(args);
+    }
+
+    /**
+     * This test case documents that a mapping with the Avro identifier(C2_INTEGER)
+     * of a special column name(C2#INTEGER) is ignored in map-column-hive.
+     * The reason is that the column type of the Avro schema and the Hive table must
+     * be equal and if we would be able to override the Hive column type using map-column-hive
+     * the inconsistency would cause a Hive error during reading.
+     */
+    @Test
+    public void testHiveImportAsParquetWithMapColumnHiveAndAvroIdentifierIgnoresMapping() throws Exception {
+      String[] args = commonArgs(getConnectString(), getTableName())
+          .withOption("map-column-hive", "C2_INTEGER=STRING")
+          .build();
+
+      runImport(args);
+
+      List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
+      assertThat(rows, hasItems(TEST_COLUMN_VALUES));
+    }
+
+    /**
+     * This test case documents that the special column name(C2#INTEGER)
+     * cannot be used in map-column-hive.
+     * The reason is that Sqoop uses the Avro identifier(C2_INTEGER) as Hive column
+     * name and there is a check in org.apache.sqoop.hive.TableDefWriter#getCreateTableStmt()
+     * which verifies that all the columns in map-column-hive are actually valid column names.
+     * Since C2_INTEGER is used instead of C2#INTEGER the check will fail on the latter.
+     */
+    @Test
+    public void testHiveImportAsParquetWithMapColumnHiveAndOriginalColumnNameFails() throws Exception {
+      String[] args = commonArgs(getConnectString(), getTableName())
+          .withOption("map-column-hive", "C2#INTEGER=STRING")
+          .build();
+
+      expectedException.expect(IllegalArgumentException.class);
+      expectedException.expectMessage("No column by the name C2#INTEGERfound while importing data");
+
+      runImportThrowingException(args);
+    }
+
+    @Test
+    public void testAllDataTypesHiveImportAsParquet() throws Exception {
+      setCurTableName("all_datatypes_table");
+      createTableWithColTypes(TEST_COLUMN_ALL_TYPES, TEST_COLUMN_ALL_TYPES_VALUES);
+      String[] args = commonArgs(getConnectString(), getTableName()).build();
+
+      runImport(args);
+
+      // The result contains a byte[] so we have to use Arrays.deepEquals() to assert.
+      Object[] firstRow = hiveServer2TestUtil.loadRawRowsFromTable(getTableName()).iterator().next().toArray();
+      assertTrue(deepEquals(EXPECTED_TEST_COLUMN_ALL_TYPES_VALUES, firstRow));
+    }
+
+    @Test
+    public void testAppendHiveImportAsParquet() throws Exception {
+      String[] args = commonArgs(getConnectString(), getTableName()).build();
+
+      runImport(args);
+
+      insertIntoTable(TEST_COLUMN_NAMES, TEST_COLUMN_TYPES, TEST_COLUMN_VALUES_LINE2);
+
+      runImport(args);
+
+      List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
+      assertThat(rows, hasItems(TEST_COLUMN_VALUES, TEST_COLUMN_VALUES_LINE2));
+    }
+
+    @Test
+    public void testCreateOverwriteHiveImportAsParquet() throws Exception {
+      String[] args = commonArgs(getConnectString(), getTableName())
+          .withOption("hive-overwrite")
+          .build();
+
+      runImport(args);
+
+      // Recreate the test table to contain different test data.
+      dropTableIfExists(getTableName());
+      createTableWithColTypesAndNames(TEST_COLUMN_NAMES, TEST_COLUMN_TYPES, TEST_COLUMN_VALUES_LINE2);
+
+      runImport(args);
+
+      List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
+      assertEquals(asList(TEST_COLUMN_VALUES_LINE2), rows);
+    }
+
+    /**
+     * --create-hive-table option is now supported with the Hadoop Parquet writer implementation.
+     */
+    @Test
+    public void testCreateHiveImportAsParquet() throws Exception {
+      String[] args = commonArgs(getConnectString(), getTableName())
+          .withOption("create-hive-table")
+          .build();
+
+      runImport(args);
+
+      expectedException.expectMessage("Error executing Hive import.");
+      runImportThrowingException(args);
+    }
+
+    /**
+     * This scenario works fine since the Hadoop Parquet writer implementation does not
+     * check the Parquet schema of the existing files. The exception will be thrown
+     * by Hive when it tries to read the files with different schema.
+     */
+    @Test
+    public void testHiveImportAsParquetWhenTableExistsWithIncompatibleSchema() throws Exception {
+      String hiveTableName = "hiveImportAsParquetWhenTableExistsWithIncompatibleSchema";
+      String[] incompatibleSchemaTableTypes = {"INTEGER", "INTEGER", "INTEGER"};
+      List<Object> incompatibleSchemaTableData = Arrays.<Object>asList(100, 200, 300);
+
+      String[] args = commonArgs(getConnectString(), getTableName())
+          .withOption("hive-table", hiveTableName)
+          .build();
+
+      runImport(args);
+
+      // We make sure we create a new table in the test RDBMS.
+      incrementTableNum();
+      createTableWithColTypes(incompatibleSchemaTableTypes, incompatibleSchemaTableData);
+
+      // Recreate the argument array to pick up the new RDBMS table name.
+      args = commonArgs(getConnectString(), getTableName())
+          .withOption("hive-table", hiveTableName)
+          .build();
+
+      runImport(args);
+    }
+
+  }
+
+  private static ArgumentArrayBuilder commonArgs(String connectString, String tableName) {
+    return new ArgumentArrayBuilder()
+        .withProperty("parquetjob.configurator.implementation", "hadoop")
+        .withOption("connect", connectString)
+        .withOption("table", tableName)
+        .withOption("hive-import")
+        .withOption("hs2-url", hiveMiniCluster.getUrl())
+        .withOption("num-mappers", "1")
+        .withOption("as-parquetfile")
+        .withOption("delete-target-dir");
+  }
+
+  public static void startHiveMiniCluster() {
+    hiveMiniCluster = new HiveMiniCluster(new NoAuthenticationConfiguration());
+    hiveMiniCluster.start();
+    hiveServer2TestUtil = new HiveServer2TestUtil(hiveMiniCluster.getUrl());
+  }
+
+  public static void stopHiveMiniCluster() {
+    hiveMiniCluster.stop();
+  }
+
+  private static byte[] decodeHex(String hexString) {
+    try {
+      return Hex.decodeHex(hexString.toCharArray());
+    } catch (DecoderException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
index 3d115ab..410724f 100644 (file)
@@ -75,6 +75,7 @@ public class TestHiveServer2TextImport extends ImportJobTestCase {
         .withOption("hive-import")
         .withOption("hs2-url", hiveMiniCluster.getUrl())
         .withOption("split-by", getColName(1))
+        .withOption("delete-target-dir")
         .build();
 
     runImport(args);
diff --git a/src/test/org/apache/sqoop/hive/TestHiveTypesForAvroTypeMapping.java b/src/test/org/apache/sqoop/hive/TestHiveTypesForAvroTypeMapping.java
new file mode 100644 (file)
index 0000000..276e9ea
--- /dev/null
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive;
+
+import org.apache.avro.Schema;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+
+import static org.apache.sqoop.hive.HiveTypes.toHiveType;
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class TestHiveTypesForAvroTypeMapping {
+
+  private final String hiveType;
+  private final Schema.Type avroType;
+
+  @Parameters(name = "hiveType = {0}, avroType = {1}")
+  public static Iterable<? extends Object> parameters() {
+    return Arrays.asList(
+        new Object[] {"BOOLEAN", Schema.Type.BOOLEAN},
+        new Object[] {"INT", Schema.Type.INT},
+        new Object[] {"BIGINT", Schema.Type.LONG},
+        new Object[] {"FLOAT", Schema.Type.FLOAT},
+        new Object[] {"DOUBLE", Schema.Type.DOUBLE},
+        new Object[] {"STRING", Schema.Type.ENUM},
+        new Object[] {"STRING", Schema.Type.STRING},
+        new Object[] {"BINARY", Schema.Type.BYTES},
+        new Object[] {"BINARY", Schema.Type.FIXED});
+  }
+
+  public TestHiveTypesForAvroTypeMapping(String hiveType, Schema.Type avroType) {
+    this.hiveType = hiveType;
+    this.avroType = avroType;
+  }
+
+  @Test
+  public void testAvroTypeToHiveTypeMapping() throws Exception {
+    assertEquals(hiveType, toHiveType(avroType));
+  }
+}
index 3ea61f6..626ad22 100644 (file)
@@ -36,6 +36,8 @@ import org.junit.rules.ExpectedException;
 
 import java.sql.Types;
 
+import static org.apache.sqoop.SqoopOptions.FileLayout.ParquetFile;
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -51,6 +53,10 @@ import static org.mockito.Mockito.when;
  */
 public class TestTableDefWriter {
 
+  private static final String TEST_AVRO_SCHEMA = "{\"type\":\"record\",\"name\":\"IMPORT_TABLE_1\",\"fields\":[{\"name\":\"C1_VARCHAR\",\"type\":[\"null\",\"string\"]},{\"name\":\"C2_INTEGER\",\"type\":[\"null\",\"int\"]},{\"name\":\"_3C_CHAR\",\"type\":[\"null\",\"string\"]}]}";
+
+  private static final String EXPECTED_CREATE_PARQUET_TABLE_STMNT = "CREATE TABLE IF NOT EXISTS `outputTable` ( `C1_VARCHAR` STRING, `C2_INTEGER` INT, `_3C_CHAR` STRING) STORED AS PARQUET";
+
   public static final Log LOG = LogFactory.getLog(
       TestTableDefWriter.class.getName());
 
@@ -256,6 +262,14 @@ public class TestTableDefWriter {
     verify(connManager).discardConnection(true);
   }
 
+  @Test
+  public void testGetCreateTableStmtWithAvroSchema() throws Exception {
+    options.setFileLayout(ParquetFile);
+    options.getConf().set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, TEST_AVRO_SCHEMA);
+
+    assertEquals(EXPECTED_CREATE_PARQUET_TABLE_STMNT, writer.getCreateTableStmt());
+  }
+
   private void setUpMockConnManager(String tableName, Map<String, Integer> typeMap) {
     when(connManager.getColumnTypes(tableName)).thenReturn(typeMap);
     when(connManager.getColumnNames(tableName)).thenReturn(typeMap.keySet().toArray(new String[]{}));
index ac6db0b..1730698 100644 (file)
@@ -41,6 +41,8 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.Arrays;
 import java.util.List;
 
@@ -322,6 +324,10 @@ public abstract class BaseSqoopTestCase {
     createTableWithColTypesAndNames(getTableName(), colNames, colTypes, vals);
   }
 
+  protected void createTableWithColTypesAndNames(String[] colNames, String[] colTypes, List<Object> record) {
+    createTableWithColTypesAndNames(getTableName(), colNames, colTypes, toStringArray(record));
+  }
+
   /**
    * Create a table with a set of columns with their names and add a row of values.
    * @param newTableName The name of the new table
@@ -439,6 +445,10 @@ public abstract class BaseSqoopTestCase {
     }
   }
 
+  protected void insertIntoTable(String[] columns, String[] colTypes, List<Object> record) {
+    insertIntoTable(columns, colTypes, toStringArray(record));
+  }
+
   protected void insertIntoTable(String[] columns, String[] colTypes, String[] vals) {
     assert colTypes != null;
     assert colTypes.length == vals.length;
@@ -674,4 +684,13 @@ public abstract class BaseSqoopTestCase {
 
     return result;
   }
+
+  public static long timeFromString(String timeStampString) {
+    try {
+      SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+      return format.parse(timeStampString).getTime();
+    } catch (ParseException e) {
+      throw new RuntimeException(e);
+    }
+  }
 }
index 4d3f938..ed4b5a4 100644 (file)
@@ -137,16 +137,4 @@ public class TestHiveServer2OptionValidations {
     sqoopTool.validateOptions(sqoopOptions);
   }
 
-  @Test
-  public void testValidateOptionsFailsWhenHs2UrlIsUsedWithParquetFormat() throws Exception {
-    expectedException.expect(SqoopOptions.InvalidOptionsException.class);
-    expectedException.expectMessage("The hs2-url option cannot be used with the as-parquetfile option.");
-
-    when(sqoopOptions.doHiveImport()).thenReturn(true);
-    when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL);
-    when(sqoopOptions.getFileLayout()).thenReturn(ParquetFile);
-
-    sqoopTool.validateOptions(sqoopOptions);
-  }
-
 }