SQOOP-3309: Implement HiveServer2 client
authorBoglarka Egyed <bogi@apache.org>
Wed, 18 Apr 2018 13:59:33 +0000 (15:59 +0200)
committerBoglarka Egyed <bogi@apache.org>
Wed, 18 Apr 2018 13:59:33 +0000 (15:59 +0200)
(Szabolcs Vasas via Boglarka Egyed)

31 files changed:
build.xml
ivy.xml
src/docs/user/hive-args.txt
src/docs/user/hive.txt
src/java/org/apache/sqoop/SqoopOptions.java
src/java/org/apache/sqoop/hive/HiveClient.java [new file with mode: 0644]
src/java/org/apache/sqoop/hive/HiveClientCommon.java [new file with mode: 0644]
src/java/org/apache/sqoop/hive/HiveClientFactory.java [new file with mode: 0644]
src/java/org/apache/sqoop/hive/HiveImport.java
src/java/org/apache/sqoop/hive/HiveServer2Client.java [new file with mode: 0644]
src/java/org/apache/sqoop/hive/HiveServer2ConnectionFactory.java [new file with mode: 0644]
src/java/org/apache/sqoop/hive/HiveServer2ConnectionFactoryInitializer.java [new file with mode: 0644]
src/java/org/apache/sqoop/hive/TableDefWriter.java
src/java/org/apache/sqoop/tool/BaseSqoopTool.java
src/java/org/apache/sqoop/tool/CreateHiveTableTool.java
src/java/org/apache/sqoop/tool/ImportAllTablesTool.java
src/java/org/apache/sqoop/tool/ImportTool.java
src/test/org/apache/sqoop/hive/HiveServer2ConnectionFactoryInitializerTest.java [new file with mode: 0644]
src/test/org/apache/sqoop/hive/TestHiveClientFactory.java [new file with mode: 0644]
src/test/org/apache/sqoop/hive/TestHiveMiniCluster.java [new file with mode: 0644]
src/test/org/apache/sqoop/hive/TestHiveServer2Client.java [new file with mode: 0644]
src/test/org/apache/sqoop/hive/TestHiveServer2TextImport.java [new file with mode: 0644]
src/test/org/apache/sqoop/hive/TestTableDefWriter.java
src/test/org/apache/sqoop/hive/minicluster/AuthenticationConfiguration.java [new file with mode: 0644]
src/test/org/apache/sqoop/hive/minicluster/HiveMiniCluster.java [new file with mode: 0644]
src/test/org/apache/sqoop/hive/minicluster/KerberosAuthenticationConfiguration.java [new file with mode: 0644]
src/test/org/apache/sqoop/hive/minicluster/NoAuthenticationConfiguration.java [new file with mode: 0644]
src/test/org/apache/sqoop/hive/minicluster/PasswordAuthenticationConfiguration.java [new file with mode: 0644]
src/test/org/apache/sqoop/testutil/HiveServer2TestUtil.java [new file with mode: 0644]
src/test/org/apache/sqoop/tool/TestHiveServer2OptionValidations.java [new file with mode: 0644]
src/test/org/apache/sqoop/tool/TestImportTool.java

index 7f68b57..a85705f 100644 (file)
--- a/build.xml
+++ b/build.xml
       <!-- enable asserts in tests -->
       <jvmarg value="-ea" />
 
+      <!-- We need to disable asserts in HadoopThriftAuthBridge to be able to run HiveMiniCluster tests. -->
+      <jvmarg value="-da:org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge" />
+
       <jvmarg value="${remoteDebugJvmArgs}"/>
 
       <sysproperty key="test.build.data" value="${build.test}/data"/>
diff --git a/ivy.xml b/ivy.xml
index 6be4fa2..6af94d9 100644 (file)
--- a/ivy.xml
+++ b/ivy.xml
@@ -195,6 +195,8 @@ under the License.
       <exclude org="org.apache.avro" module="avro" />
     </dependency>
 
+    <dependency org="org.apache.hive" name="hive-jdbc" rev="${hcatalog.version}" conf="common->default" />
+
     <dependency org="org.apache.hive.hcatalog" name="hive-hcatalog-core"
       rev="${hcatalog.version}" conf="common->default">
       <artifact name="hive-hcatalog-core" type="jar"/>
index 441f54e..7509564 100644 (file)
@@ -43,5 +43,10 @@ Argument                      Description
                   Hive type for configured columns. If specify commas in\
                   this argument, use URL encoded keys and values, for example,\
                   use DECIMAL(1%2C%201) instead of DECIMAL(1, 1).
++\--hs2-url+                  The JDBC connection string to HiveServer2 as you would specify in Beeline. If you use this option with \
+                    --hive-import then Sqoop will try to connect to HiveServer2 instead of using Hive CLI.
++\--hs2-user+                 The user for creating the JDBC connection to HiveServer2. The default is the current OS user.
++\--hs2-keytab+               The path to the keytab file of the user connecting to HiveServer2. If you choose another \
+                       HiveServer2 user (with --hs2-user) then --hs2-keytab has to be also specified otherwise it can be omitted.
 --------------------------------------------------------------------------
 
index 3dc8bb4..f8f7c27 100644 (file)
@@ -35,12 +35,22 @@ omitted, Sqoop will generate a Hive script containing a +CREATE TABLE+
 operation defining your columns using Hive's types, and a +LOAD DATA INPATH+
 statement to move the data files into Hive's warehouse directory.
 
-The script will be executed by calling
+The script can be executed in two ways:
+
+- By the default the script will be executed by calling
 the installed copy of hive on the machine where Sqoop is run. If you have
 multiple Hive installations, or +hive+ is not in your +$PATH+, use the
 *+\--hive-home+* option to identify the Hive installation directory.
 Sqoop will use +$HIVE_HOME/bin/hive+ from here.
 
+- If the user specifies the *+\--hs2-url+* parameter then the script will
+ be sent to HiveServer2 through a JDBC connection. Note that the data itself
+ will not be transferred via the JDBC connection it is written directly to HDFS
+ just like in case of the default hive import. As HiveServer2 provides proper
+ authorization and auditing features it is recommended to use this instead of
+ the default. Currently only Kerberos authentication and text file format is
+ supported with this option.
+
 NOTE: This function is incompatible with +\--as-avrodatafile+ and
 +\--as-sequencefile+.
 
index 651cebd..d9984af 100644 (file)
@@ -449,6 +449,15 @@ public class SqoopOptions implements Cloneable {
   private String metaUsername;
   private String metaPassword;
 
+  @StoredAsProperty("hs2.url")
+  private String hs2Url;
+
+  @StoredAsProperty("hs2.user")
+  private String hs2User;
+
+  @StoredAsProperty("hs2.keytab")
+  private String hs2Keytab;
+
   public SqoopOptions() {
     initDefaults(null);
   }
@@ -2892,5 +2901,29 @@ public class SqoopOptions implements Cloneable {
     this.metaPassword = metaPassword;
   }
 
+  public String getHs2Url() {
+    return hs2Url;
+  }
+
+  public void setHs2Url(String hs2Url) {
+    this.hs2Url = hs2Url;
+  }
+
+  public String getHs2User() {
+    return hs2User;
+  }
+
+  public void setHs2User(String hs2User) {
+    this.hs2User = hs2User;
+  }
+
+  public String getHs2Keytab() {
+    return hs2Keytab;
+  }
+
+  public void setHs2Keytab(String hs2Keytab) {
+    this.hs2Keytab = hs2Keytab;
+  }
+
 }
 
diff --git a/src/java/org/apache/sqoop/hive/HiveClient.java b/src/java/org/apache/sqoop/hive/HiveClient.java
new file mode 100644 (file)
index 0000000..994f858
--- /dev/null
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive;
+
+import java.io.IOException;
+
+public interface HiveClient {
+
+  void importTable() throws IOException;
+
+  void createTable() throws IOException;
+}
diff --git a/src/java/org/apache/sqoop/hive/HiveClientCommon.java b/src/java/org/apache/sqoop/hive/HiveClientCommon.java
new file mode 100644 (file)
index 0000000..952db6d
--- /dev/null
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.io.CodecMap;
+
+import java.io.IOException;
+
+/**
+ * Class containing the common logic for different HiveClient implementations.
+ */
+public class HiveClientCommon {
+
+  public static final Log LOG = LogFactory.getLog(HiveClientCommon.class.getName());
+
+  /**
+   * If we used a MapReduce-based upload of the data, remove the _logs dir
+   * from where we put it, before running Hive LOAD DATA INPATH.
+   */
+  public void removeTempLogs(Configuration configuration, Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(configuration);
+    Path logsPath = new Path(tablePath, "_logs");
+    if (fs.exists(logsPath)) {
+      LOG.info("Removing temporary files from import process: " + logsPath);
+      if (!fs.delete(logsPath, true)) {
+        LOG.warn("Could not delete temporary files; "
+            + "continuing with import, but it may fail.");
+      }
+    }
+  }
+
+  /**
+   * Clean up after successful HIVE import.
+   *
+   * @param outputPath path to the output directory
+   * @throws IOException
+   */
+  public void cleanUp(Configuration configuration, Path outputPath) throws IOException {
+    FileSystem fs = outputPath.getFileSystem(configuration);
+
+    // HIVE is not always removing input directory after LOAD DATA statement
+    // (which is our export directory). We're removing export directory in case
+    // that is blank for case that user wants to periodically populate HIVE
+    // table (for example with --hive-overwrite).
+    try {
+      if (outputPath != null && fs.exists(outputPath)) {
+        FileStatus[] statuses = fs.listStatus(outputPath);
+        if (statuses.length == 0) {
+          LOG.info("Export directory is empty, removing it.");
+          fs.delete(outputPath, true);
+        } else if (statuses.length == 1 && statuses[0].getPath().getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
+          LOG.info("Export directory is contains the _SUCCESS file only, removing the directory.");
+          fs.delete(outputPath, true);
+        } else {
+          LOG.info("Export directory is not empty, keeping it.");
+        }
+      }
+    } catch(IOException e) {
+      LOG.error("Issue with cleaning (safe to ignore)", e);
+    }
+  }
+
+  public void indexLzoFiles(SqoopOptions sqoopOptions, Path finalPath) throws IOException {
+    String codec = sqoopOptions.getCompressionCodec();
+    if (codec != null && (codec.equals(CodecMap.LZOP)
+        || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
+      try {
+        Tool tool = ReflectionUtils.newInstance(Class.
+            forName("com.hadoop.compression.lzo.DistributedLzoIndexer").
+            asSubclass(Tool.class), sqoopOptions.getConf());
+        ToolRunner.run(sqoopOptions.getConf(), tool,
+            new String[]{finalPath.toString()});
+      } catch (Exception ex) {
+        LOG.error("Error indexing lzo files", ex);
+        throw new IOException("Error indexing lzo files", ex);
+      }
+    }
+  }
+
+}
diff --git a/src/java/org/apache/sqoop/hive/HiveClientFactory.java b/src/java/org/apache/sqoop/hive/HiveClientFactory.java
new file mode 100644 (file)
index 0000000..67de8f0
--- /dev/null
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.authentication.KerberosAuthenticator;
+import org.apache.sqoop.db.JdbcConnectionFactory;
+import org.apache.sqoop.db.decorator.KerberizedConnectionFactoryDecorator;
+import org.apache.sqoop.manager.ConnManager;
+
+import java.io.IOException;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
+public class HiveClientFactory {
+
+  private final HiveServer2ConnectionFactoryInitializer connectionFactoryInitializer;
+
+  public HiveClientFactory(HiveServer2ConnectionFactoryInitializer connectionFactoryInitializer) {
+    this.connectionFactoryInitializer = connectionFactoryInitializer;
+  }
+
+  public HiveClientFactory() {
+    this(new HiveServer2ConnectionFactoryInitializer());
+  }
+
+  public HiveClient createHiveClient(SqoopOptions sqoopOptions, ConnManager connManager) {
+    if (useHiveCli(sqoopOptions)) {
+      return createHiveImport(sqoopOptions, connManager);
+    } else {
+      return createHiveServer2Client(sqoopOptions, connManager);
+    }
+  }
+
+  private HiveClient createHiveImport(SqoopOptions sqoopOptions, ConnManager connManager) {
+    return new HiveImport(sqoopOptions, connManager, sqoopOptions.getConf(), false);
+  }
+
+  private HiveClient createHiveServer2Client(SqoopOptions sqoopOptions, ConnManager connManager) {
+    TableDefWriter tableDefWriter = createTableDefWriter(sqoopOptions, connManager);
+    JdbcConnectionFactory hs2JdbcConnectionFactory = connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions);
+    return new HiveServer2Client(sqoopOptions, tableDefWriter, hs2JdbcConnectionFactory);
+  }
+
+  TableDefWriter createTableDefWriter(SqoopOptions sqoopOptions, ConnManager connManager) {
+    return new TableDefWriter(sqoopOptions, connManager, sqoopOptions.getTableName(), sqoopOptions.getHiveTableName(), sqoopOptions.getConf(), false);
+  }
+
+  private boolean useHiveCli(SqoopOptions sqoopOptions) {
+    return StringUtils.isEmpty(sqoopOptions.getHs2Url());
+  }
+
+}
index c272911..5da00a7 100644 (file)
@@ -31,16 +31,9 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.FileOutputCommitter;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.util.Tool;
-import org.apache.sqoop.io.CodecMap;
 import org.apache.sqoop.util.Executor;
 import org.apache.sqoop.util.LoggingAsyncSink;
 import org.apache.sqoop.util.SubprocessSecurityManager;
@@ -54,7 +47,7 @@ import org.apache.sqoop.util.ExitSecurityException;
  * to Hive itself as well as orchestrating the use of the other classes in this
  * package.
  */
-public class HiveImport {
+public class HiveImport implements HiveClient {
 
   public static final Log LOG = LogFactory.getLog(HiveImport.class.getName());
 
@@ -62,6 +55,7 @@ public class HiveImport {
   private ConnManager connManager;
   private Configuration configuration;
   private boolean generateOnly;
+  private HiveClientCommon hiveClientCommon;
   private static boolean testMode = false;
 
   public static boolean getTestMode() {
@@ -77,13 +71,17 @@ public class HiveImport {
       "org.apache.hadoop.hive.cli.CliDriver";
 
   public HiveImport(final SqoopOptions opts, final ConnManager connMgr,
-      final Configuration conf, final boolean generateOnly) {
+      final Configuration conf, final boolean generateOnly, final HiveClientCommon hiveClientCommon) {
     this.options = opts;
     this.connManager = connMgr;
     this.configuration = conf;
     this.generateOnly = generateOnly;
+    this.hiveClientCommon = hiveClientCommon;
   }
 
+  public HiveImport(SqoopOptions opts, ConnManager connMgr, Configuration conf, boolean generateOnly) {
+    this(opts, connMgr, conf, generateOnly, new HiveClientCommon());
+  }
 
   /**
    * @return the filename of the hive executable to run to do the import
@@ -111,27 +109,11 @@ public class HiveImport {
   }
 
   /**
-   * If we used a MapReduce-based upload of the data, remove the _logs dir
-   * from where we put it, before running Hive LOAD DATA INPATH.
-   */
-  private void removeTempLogs(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(configuration);
-    Path logsPath = new Path(tablePath, "_logs");
-    if (fs.exists(logsPath)) {
-      LOG.info("Removing temporary files from import process: " + logsPath);
-      if (!fs.delete(logsPath, true)) {
-        LOG.warn("Could not delete temporary files; "
-            + "continuing with import, but it may fail.");
-      }
-    }
-  }
-
-  /**
    * @return true if we're just generating the DDL for the import, but
    * not actually running it (i.e., --generate-only mode). If so, don't
    * do any side-effecting actions in Hive.
    */
-  private boolean isGenerateOnly() {
+  boolean isGenerateOnly() {
     return generateOnly;
   }
 
@@ -181,8 +163,6 @@ public class HiveImport {
     }
 
     // generate the HQL statements to run.
-    // reset the connection as it might have timed out
-    connManager.discardConnection(true);
     TableDefWriter tableWriter = new TableDefWriter(options, connManager,
         inputTableName, outputTableName,
         configuration, !debugMode);
@@ -191,23 +171,10 @@ public class HiveImport {
     Path finalPath = tableWriter.getFinalPath();
 
     if (!isGenerateOnly()) {
-      removeTempLogs(finalPath);
+      hiveClientCommon.removeTempLogs(configuration, finalPath);
       LOG.info("Loading uploaded data into Hive");
 
-      String codec = options.getCompressionCodec();
-      if (codec != null && (codec.equals(CodecMap.LZOP)
-              || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
-        try {
-          Tool tool = ReflectionUtils.newInstance(Class.
-                  forName("com.hadoop.compression.lzo.DistributedLzoIndexer").
-                  asSubclass(Tool.class), configuration);
-          ToolRunner.run(configuration, tool,
-              new String[] { finalPath.toString() });
-        } catch (Exception ex) {
-          LOG.error("Error indexing lzo files", ex);
-          throw new IOException("Error indexing lzo files", ex);
-        }
-      }
+      hiveClientCommon.indexLzoFiles(options, finalPath);
     }
 
     // write them to a script file.
@@ -242,7 +209,7 @@ public class HiveImport {
 
         LOG.info("Hive import complete.");
 
-        cleanUp(finalPath);
+        hiveClientCommon.cleanUp(configuration, finalPath);
       }
     } finally {
       if (!isGenerateOnly()) {
@@ -256,37 +223,6 @@ public class HiveImport {
     }
   }
 
-  /**
-   * Clean up after successful HIVE import.
-   *
-   * @param outputPath path to the output directory
-   * @throws IOException
-   */
-  private void cleanUp(Path outputPath) throws IOException {
-    FileSystem fs = outputPath.getFileSystem(configuration);
-
-    // HIVE is not always removing input directory after LOAD DATA statement
-    // (which is our export directory). We're removing export directory in case
-    // that is blank for case that user wants to periodically populate HIVE
-    // table (for example with --hive-overwrite).
-    try {
-      if (outputPath != null && fs.exists(outputPath)) {
-        FileStatus[] statuses = fs.listStatus(outputPath);
-        if (statuses.length == 0) {
-          LOG.info("Export directory is empty, removing it.");
-          fs.delete(outputPath, true);
-        } else if (statuses.length == 1 && statuses[0].getPath().getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
-          LOG.info("Export directory is contains the _SUCCESS file only, removing the directory.");
-          fs.delete(outputPath, true);
-        } else {
-          LOG.info("Export directory is not empty, keeping it.");
-        }
-      }
-    } catch(IOException e) {
-      LOG.error("Issue with cleaning (safe to ignore)", e);
-    }
-  }
-
   @SuppressWarnings("unchecked")
   /**
    * Execute the script file via Hive.
@@ -398,5 +334,28 @@ public class HiveImport {
 
     return newArgs.toArray(new String[newArgs.size()]);
   }
+
+  @Override
+  public void importTable() throws IOException {
+    importTable(options.getTableName(), options.getHiveTableName(), false);
+  }
+
+  @Override
+  public void createTable() throws IOException {
+    importTable(options.getTableName(), options.getHiveTableName(), true);
+  }
+
+  SqoopOptions getOptions() {
+    return options;
+  }
+
+  ConnManager getConnManager() {
+    return connManager;
+  }
+
+  Configuration getConfiguration() {
+    return configuration;
+  }
+
 }
 
diff --git a/src/java/org/apache/sqoop/hive/HiveServer2Client.java b/src/java/org/apache/sqoop/hive/HiveServer2Client.java
new file mode 100644 (file)
index 0000000..c4976c1
--- /dev/null
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.db.JdbcConnectionFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+
+public class HiveServer2Client implements HiveClient {
+
+  private static final Log LOG = LogFactory.getLog(HiveServer2Client.class.getName());
+
+  private final SqoopOptions sqoopOptions;
+
+  private final TableDefWriter tableDefWriter;
+
+  private final JdbcConnectionFactory hs2ConnectionFactory;
+
+  private final HiveClientCommon hiveClientCommon;
+
+  public HiveServer2Client(SqoopOptions sqoopOptions, TableDefWriter tableDefWriter, JdbcConnectionFactory hs2ConnectionFactory, HiveClientCommon hiveClientCommon) {
+    this.sqoopOptions = sqoopOptions;
+    this.tableDefWriter = tableDefWriter;
+    this.hs2ConnectionFactory = hs2ConnectionFactory;
+    this.hiveClientCommon = hiveClientCommon;
+  }
+
+  public HiveServer2Client(SqoopOptions sqoopOptions, TableDefWriter tableDefWriter, JdbcConnectionFactory hs2ConnectionFactory) {
+    this(sqoopOptions, tableDefWriter, hs2ConnectionFactory, new HiveClientCommon());
+  }
+
+  @Override
+  public void importTable() throws IOException {
+    LOG.info("Loading uploaded data into Hive.");
+    String createTableStmt = tableDefWriter.getCreateTableStmt();
+    String loadDataStmt = tableDefWriter.getLoadDataStmt();
+    executeHiveImport(asList(createTableStmt, loadDataStmt));
+    LOG.info("Hive import complete.");
+  }
+
+  @Override
+  public void createTable() throws IOException {
+    LOG.info("Creating Hive table: " + tableDefWriter.getOutputTableName());
+    String createTableStmt = tableDefWriter.getCreateTableStmt();
+    executeHiveImport(asList(createTableStmt));
+    LOG.info("Hive table is successfully created.");
+  }
+
+  void executeHiveImport(List<String> commands) throws IOException {
+    Path finalPath = tableDefWriter.getFinalPath();
+
+    hiveClientCommon.removeTempLogs(sqoopOptions.getConf(), finalPath);
+
+    hiveClientCommon.indexLzoFiles(sqoopOptions, finalPath);
+
+    try {
+      executeCommands(commands);
+    } catch (SQLException e) {
+      throw new RuntimeException("Error executing Hive import.", e);
+    }
+
+    hiveClientCommon.cleanUp(sqoopOptions.getConf(), finalPath);
+  }
+
+  void executeCommands(List<String> commands) throws SQLException {
+    try (Connection hs2Connection = hs2ConnectionFactory.createConnection()) {
+      for (String command : commands) {
+        LOG.debug("Executing command: " + command);
+        try (PreparedStatement statement = hs2Connection.prepareStatement(command)) {
+          statement.execute();
+        }
+      }
+    }
+  }
+
+  SqoopOptions getSqoopOptions() {
+    return sqoopOptions;
+  }
+
+  TableDefWriter getTableDefWriter() {
+    return tableDefWriter;
+  }
+
+  JdbcConnectionFactory getHs2ConnectionFactory() {
+    return hs2ConnectionFactory;
+  }
+}
diff --git a/src/java/org/apache/sqoop/hive/HiveServer2ConnectionFactory.java b/src/java/org/apache/sqoop/hive/HiveServer2ConnectionFactory.java
new file mode 100644 (file)
index 0000000..10515a0
--- /dev/null
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.sqoop.db.DriverManagerJdbcConnectionFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+
+public class HiveServer2ConnectionFactory extends DriverManagerJdbcConnectionFactory {
+
+  private static final Log LOG = LogFactory.getLog(HiveServer2ConnectionFactory.class.getName());
+
+  private static final String HS2_DRIVER_CLASS = "org.apache.hive.jdbc.HiveDriver";
+
+  public HiveServer2ConnectionFactory(String connectionString, String username, String password) {
+    super(HS2_DRIVER_CLASS, connectionString, username, password);
+  }
+
+  public HiveServer2ConnectionFactory(String connectionString, String username) {
+    this(connectionString, username, null);
+  }
+
+  @Override
+  public Connection createConnection() {
+    LOG.info("Creating connection to HiveServer2 as: " + getCurrentUser());
+    return super.createConnection();
+  }
+
+  private String getCurrentUser() {
+    try {
+      return UserGroupInformation.getCurrentUser().toString();
+    } catch (IOException e) {
+      LOG.error("Unable to determine current user.", e);
+    }
+    return EMPTY;
+  }
+
+}
diff --git a/src/java/org/apache/sqoop/hive/HiveServer2ConnectionFactoryInitializer.java b/src/java/org/apache/sqoop/hive/HiveServer2ConnectionFactoryInitializer.java
new file mode 100644 (file)
index 0000000..1d959f9
--- /dev/null
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive;
+
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.authentication.KerberosAuthenticator;
+import org.apache.sqoop.db.JdbcConnectionFactory;
+import org.apache.sqoop.db.decorator.KerberizedConnectionFactoryDecorator;
+
+import java.io.IOException;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
+public class HiveServer2ConnectionFactoryInitializer {
+
+  public JdbcConnectionFactory createJdbcConnectionFactory(SqoopOptions sqoopOptions) {
+    String connectionUsername = determineConnectionUsername(sqoopOptions);
+    JdbcConnectionFactory connectionFactory = new HiveServer2ConnectionFactory(sqoopOptions.getHs2Url(), connectionUsername);
+    if (useKerberizedConnection(sqoopOptions)) {
+      KerberosAuthenticator authenticator = createKerberosAuthenticator(sqoopOptions);
+      connectionFactory = new KerberizedConnectionFactoryDecorator(connectionFactory, authenticator);
+    }
+    return connectionFactory;
+  }
+
+  private String determineConnectionUsername(SqoopOptions sqoopOptions) {
+    if (!isEmpty(sqoopOptions.getHs2User())) {
+      return sqoopOptions.getHs2User();
+    }
+    try {
+      return UserGroupInformation.getLoginUser().getUserName();
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to determine login user.", e);
+    }
+  }
+
+  private KerberosAuthenticator createKerberosAuthenticator(SqoopOptions sqoopOptions) {
+    return new KerberosAuthenticator(sqoopOptions.getConf(), sqoopOptions.getHs2User(), sqoopOptions.getHs2Keytab());
+  }
+
+  private boolean useKerberizedConnection(SqoopOptions sqoopOptions) {
+    return !isBlank(sqoopOptions.getHs2Keytab());
+  }
+
+}
index b7a25b7..27d988c 100644 (file)
@@ -96,6 +96,7 @@ public class TableDefWriter {
    * @return the CREATE TABLE statement for the table to load into hive.
    */
   public String getCreateTableStmt() throws IOException {
+    resetConnManager();
     Map<String, Integer> columnTypes;
     Properties userMapping = options.getMapColumnHive();
     Boolean isHiveExternalTableSet = !StringUtils.isBlank(options.getHiveExternalTableDir());
@@ -286,5 +287,38 @@ public class TableDefWriter {
     return String.format("\\%03o", charNum);
   }
 
+  /**
+   * The JDBC connection owned by the ConnManager has been most probably opened when the import was started
+   * so it might have timed out by the time TableDefWriter methods are invoked which happens at the end of import.
+   * The task of this method is to discard the current connection held by ConnManager to make sure
+   * that TableDefWriter will have a working one.
+   */
+  private void resetConnManager() {
+    this.connManager.discardConnection(true);
+  }
+
+  SqoopOptions getOptions() {
+    return options;
+  }
+
+  ConnManager getConnManager() {
+    return connManager;
+  }
+
+  Configuration getConfiguration() {
+    return configuration;
+  }
+
+  String getInputTableName() {
+    return inputTableName;
+  }
+
+  String getOutputTableName() {
+    return outputTableName;
+  }
+
+  boolean isCommentsEnabled() {
+    return commentsEnabled;
+  }
 }
 
index b02e4fe..783651a 100644 (file)
@@ -18,6 +18,8 @@
 
 package org.apache.sqoop.tool;
 
+import static java.lang.String.format;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -131,6 +133,9 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool {
   public static final String HCATALOG_STORAGE_STANZA_ARG =
     "hcatalog-storage-stanza";
   public static final String HCATALOG_HOME_ARG = "hcatalog-home";
+  public static final String HS2_URL_ARG = "hs2-url";
+  public static final String HS2_USER_ARG = "hs2-user";
+  public static final String HS2_KEYTAB_ARG = "hs2-keytab";
   public static final String MAPREDUCE_JOB_NAME = "mapreduce-job-name";
   public static final String NUM_MAPPERS_ARG = "num-mappers";
   public static final String NUM_MAPPERS_SHORT_ARG = "m";
@@ -609,6 +614,21 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool {
           + " types.")
         .withLongOpt(MAP_COLUMN_HIVE)
         .create());
+    hiveOpts.addOption(OptionBuilder
+        .hasArg()
+        .withDescription("The URL to the HiveServer2.")
+        .withLongOpt(HS2_URL_ARG)
+        .create());
+    hiveOpts.addOption(OptionBuilder
+        .hasArg()
+        .withDescription("The user/principal for HiveServer2.")
+        .withLongOpt(HS2_USER_ARG)
+        .create());
+    hiveOpts.addOption(OptionBuilder
+        .hasArg()
+        .withDescription("The location of the keytab of the HiveServer2 user.")
+        .withLongOpt(HS2_KEYTAB_ARG)
+        .create());
 
     return hiveOpts;
   }
@@ -1238,6 +1258,15 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool {
    if (in.hasOption(HIVE_EXTERNAL_TABLE_LOCATION_ARG)) {
      out.setHiveExternalTableDir(in.getOptionValue(HIVE_EXTERNAL_TABLE_LOCATION_ARG));
    }
+   if (in.hasOption(HS2_URL_ARG)) {
+      out.setHs2Url(in.getOptionValue(HS2_URL_ARG));
+   }
+   if (in.hasOption(HS2_USER_ARG)) {
+      out.setHs2User(in.getOptionValue(HS2_USER_ARG));
+   }
+   if (in.hasOption(HS2_KEYTAB_ARG)) {
+      out.setHs2Keytab(in.getOptionValue(HS2_KEYTAB_ARG));
+   }
 
   }
 
@@ -1618,6 +1647,8 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool {
       throw new InvalidOptionsException("Importing to external Hive table requires --hive-import parameter to be set."
           + HELP_STR);
     }
+
+    validateHS2Options(options);
   }
 
   protected void validateAccumuloOptions(SqoopOptions options)
@@ -1851,5 +1882,26 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool {
           "Was called with the --direct option, but no direct connector available.");
     }
   }
-}
 
+  protected void validateHS2Options(SqoopOptions options) throws SqoopOptions.InvalidOptionsException {
+    final String withoutTemplate = "The %s option cannot be used without the %s option.";
+    final String withTemplate = "The %s option cannot be used with the %s option.";
+
+    if (isSet(options.getHs2Url()) && !options.doHiveImport()) {
+      throw new InvalidOptionsException(format(withoutTemplate, HS2_URL_ARG, HIVE_IMPORT_ARG));
+    }
+
+    if (isSet(options.getHs2User()) && !isSet(options.getHs2Url())) {
+      throw  new InvalidOptionsException(format(withoutTemplate, HS2_USER_ARG, HS2_URL_ARG));
+    }
+
+    if (isSet(options.getHs2Keytab()) && !isSet(options.getHs2User())) {
+      throw  new InvalidOptionsException(format(withoutTemplate, HS2_KEYTAB_ARG, HS2_USER_ARG));
+    }
+
+    if (isSet(options.getHs2Url()) && (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile)) {
+      throw  new InvalidOptionsException(format(withTemplate, HS2_URL_ARG, FMT_PARQUETFILE_ARG));
+    }
+
+  }
+}
index d259566..02f5d2d 100644 (file)
@@ -30,7 +30,8 @@ import org.apache.sqoop.SqoopOptions;
 import org.apache.sqoop.SqoopOptions.InvalidOptionsException;
 import org.apache.sqoop.cli.RelatedOptions;
 import org.apache.sqoop.cli.ToolOptions;
-import org.apache.sqoop.hive.HiveImport;
+import org.apache.sqoop.hive.HiveClient;
+import org.apache.sqoop.hive.HiveClientFactory;
 
 /**
  * Tool that creates a Hive table definition.
@@ -40,8 +41,15 @@ public class CreateHiveTableTool extends BaseSqoopTool {
   public static final Log LOG = LogFactory.getLog(
       CreateHiveTableTool.class.getName());
 
-  public CreateHiveTableTool() {
+  private final HiveClientFactory hiveClientFactory;
+
+  public CreateHiveTableTool(HiveClientFactory hiveClientFactory) {
     super("create-hive-table");
+    this.hiveClientFactory = hiveClientFactory;
+  }
+
+  public CreateHiveTableTool() {
+    this(new HiveClientFactory());
   }
 
   @Override
@@ -52,10 +60,8 @@ public class CreateHiveTableTool extends BaseSqoopTool {
     }
 
     try {
-      HiveImport hiveImport = new HiveImport(options, manager,
-          options.getConf(), false);
-      hiveImport.importTable(options.getTableName(),
-          options.getHiveTableName(), true);
+      HiveClient hiveClient = hiveClientFactory.createHiveClient(options, manager);
+      hiveClient.createTable();
     } catch (IOException ioe) {
       LOG.error("Encountered IOException running create table job: "
           + StringUtils.stringifyException(ioe));
index 18f7a0a..6fb4a66 100644 (file)
@@ -31,7 +31,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.sqoop.SqoopOptions;
 import org.apache.sqoop.SqoopOptions.InvalidOptionsException;
 import org.apache.sqoop.cli.RelatedOptions;
-import org.apache.sqoop.hive.HiveImport;
 import org.apache.sqoop.util.ImportException;
 
 /**
@@ -75,7 +74,6 @@ public class ImportAllTablesTool extends ImportTool {
   @Override
   /** {@inheritDoc} */
   public int run(SqoopOptions options) {
-    HiveImport hiveImport = null;
     Set<String> excludes = new HashSet<String>();
 
     if (!init(options)) {
@@ -83,9 +81,6 @@ public class ImportAllTablesTool extends ImportTool {
     }
 
     try {
-      if (options.doHiveImport()) {
-        hiveImport = new HiveImport(options, manager, options.getConf(), false);
-      }
 
       if (options.getAllTablesExclude() != null) {
         excludes.addAll(Arrays.asList(options.getAllTablesExclude().split(",")));
@@ -102,7 +97,8 @@ public class ImportAllTablesTool extends ImportTool {
             System.out.println("Skipping table: " + tableName);
           } else {
             SqoopOptions clonedOptions = (SqoopOptions) options.clone();
-            importTable(clonedOptions, tableName, hiveImport);
+            clonedOptions.setTableName(tableName);
+            importTable(clonedOptions);
           }
         }
       }
index e992005..ee79d8b 100644 (file)
@@ -42,7 +42,8 @@ import org.apache.sqoop.SqoopOptions;
 import org.apache.sqoop.SqoopOptions.InvalidOptionsException;
 import org.apache.sqoop.cli.RelatedOptions;
 import org.apache.sqoop.cli.ToolOptions;
-import org.apache.sqoop.hive.HiveImport;
+import org.apache.sqoop.hive.HiveClient;
+import org.apache.sqoop.hive.HiveClientFactory;
 import org.apache.sqoop.manager.ImportJobContext;
 import org.apache.sqoop.mapreduce.MergeJob;
 import org.apache.sqoop.metastore.JobData;
@@ -78,18 +79,21 @@ public class ImportTool extends BaseSqoopTool {
   // Set classloader for local job runner
   private ClassLoader prevClassLoader = null;
 
+  private final HiveClientFactory hiveClientFactory;
+
   public ImportTool() {
     this("import", false);
   }
 
   public ImportTool(String toolName, boolean allTables) {
-    this(toolName, new CodeGenTool(), allTables);
+    this(toolName, new CodeGenTool(), allTables, new HiveClientFactory());
   }
 
-  public ImportTool(String toolName, CodeGenTool codeGenerator, boolean allTables) {
+  public ImportTool(String toolName, CodeGenTool codeGenerator, boolean allTables, HiveClientFactory hiveClientFactory) {
     super(toolName);
     this.codeGenerator = codeGenerator;
     this.allTables = allTables;
+    this.hiveClientFactory = hiveClientFactory;
   }
 
   @Override
@@ -499,17 +503,16 @@ public class ImportTool extends BaseSqoopTool {
    * Import a table or query.
    * @return true if an import was performed, false otherwise.
    */
-  protected boolean importTable(SqoopOptions options, String tableName,
-      HiveImport hiveImport) throws IOException, ImportException {
+  protected boolean importTable(SqoopOptions options) throws IOException, ImportException {
     String jarFile = null;
 
     // Generate the ORM code for the tables.
-    jarFile = codeGenerator.generateORM(options, tableName);
+    jarFile = codeGenerator.generateORM(options, options.getTableName());
 
-    Path outputPath = getOutputPath(options, tableName);
+    Path outputPath = getOutputPath(options, options.getTableName());
 
     // Do the actual import.
-    ImportJobContext context = new ImportJobContext(tableName, jarFile,
+    ImportJobContext context = new ImportJobContext(options.getTableName(), jarFile,
         options, outputPath);
 
     // If we're doing an incremental import, set up the
@@ -522,7 +525,7 @@ public class ImportTool extends BaseSqoopTool {
       deleteTargetDir(context);
     }
 
-    if (null != tableName) {
+    if (null != options.getTableName()) {
       manager.importTable(context);
     } else {
       manager.importQuery(context);
@@ -540,7 +543,8 @@ public class ImportTool extends BaseSqoopTool {
       // For Parquet file, the import action will create hive table directly via
       // kite. So there is no need to do hive import as a post step again.
       if (options.getFileLayout() != SqoopOptions.FileLayout.ParquetFile) {
-        hiveImport.importTable(tableName, options.getHiveTableName(), false);
+        HiveClient hiveClient = hiveClientFactory.createHiveClient(options, manager);
+        hiveClient.importTable();
       }
     }
 
@@ -609,8 +613,6 @@ public class ImportTool extends BaseSqoopTool {
   @Override
   /** {@inheritDoc} */
   public int run(SqoopOptions options) {
-    HiveImport hiveImport = null;
-
     if (allTables) {
       // We got into this method, but we should be in a subclass.
       // (This method only handles a single table)
@@ -626,12 +628,8 @@ public class ImportTool extends BaseSqoopTool {
     codeGenerator.setManager(manager);
 
     try {
-      if (options.doHiveImport()) {
-        hiveImport = new HiveImport(options, manager, options.getConf(), false);
-      }
-
       // Import a single table (or query) the user specified.
-      importTable(options, options.getTableName(), hiveImport);
+      importTable(options);
     } catch (IllegalArgumentException iea) {
         LOG.error(IMPORT_FAILED_ERROR_MSG + iea.getMessage());
       rethrowIfRequired(options, iea);
diff --git a/src/test/org/apache/sqoop/hive/HiveServer2ConnectionFactoryInitializerTest.java b/src/test/org/apache/sqoop/hive/HiveServer2ConnectionFactoryInitializerTest.java
new file mode 100644 (file)
index 0000000..4d2cb2f
--- /dev/null
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.db.JdbcConnectionFactory;
+import org.apache.sqoop.db.decorator.KerberizedConnectionFactoryDecorator;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class HiveServer2ConnectionFactoryInitializerTest {
+
+  private static final String TEST_HS2_URL = "jdbc:hive2://myhost:10000/default";
+
+  private static final String TEST_HS2_USER = "testuser";
+
+  private static final String TEST_HS2_KEYTAB = "testkeytab";
+
+  private HiveServer2ConnectionFactoryInitializer connectionFactoryInitializer;
+
+  private SqoopOptions sqoopOptions;
+
+  private Configuration configuration;
+
+  private SoftAssertions softly;
+
+  @Before
+  public void before() {
+    connectionFactoryInitializer = new HiveServer2ConnectionFactoryInitializer();
+    sqoopOptions = mock(SqoopOptions.class);
+    configuration = mock(Configuration.class);
+    softly = new SoftAssertions();
+
+    when(sqoopOptions.getHs2User()).thenReturn(TEST_HS2_USER);
+    when(sqoopOptions.getConf()).thenReturn(configuration);
+  }
+
+  @Test
+  public void testCreateJdbcConnectionFactoryWithoutKerberosConfiguredReturnsHiveServer2ConnectionFactory() throws Exception {
+    JdbcConnectionFactory connectionFactory = connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions);
+
+    assertThat(connectionFactory, instanceOf(HiveServer2ConnectionFactory.class));
+  }
+
+  @Test
+  public void testCreateJdbcConnectionFactoryInitializesConnectionStringProperly() throws Exception {
+    when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL);
+    HiveServer2ConnectionFactory connectionFactory = (HiveServer2ConnectionFactory) connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions);
+
+    assertEquals(TEST_HS2_URL, connectionFactory.getConnectionString());
+  }
+
+  @Test
+  public void testCreateJdbcConnectionFactoryInitializesConnectionUsernameProperly() throws Exception {
+    HiveServer2ConnectionFactory connectionFactory = (HiveServer2ConnectionFactory) connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions);
+
+    assertEquals(TEST_HS2_USER, connectionFactory.getUsername());
+  }
+
+  @Test
+  public void testCreateJdbcConnectionFactoryWithoutHs2UserSpecifiedInitializesConnectionUsernameProperly() throws Exception {
+    when(sqoopOptions.getHs2User()).thenReturn(null);
+    String expectedUsername = UserGroupInformation.getLoginUser().getUserName();
+    HiveServer2ConnectionFactory connectionFactory = (HiveServer2ConnectionFactory) connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions);
+
+    assertEquals(expectedUsername, connectionFactory.getUsername());
+  }
+
+  @Test
+  public void testCreateJdbcConnectionFactoryWithKerberosConfiguredReturnsKerberizedConnectionFactoryDecorator() throws Exception {
+    when(sqoopOptions.getHs2Keytab()).thenReturn(TEST_HS2_KEYTAB);
+
+    JdbcConnectionFactory connectionFactory = connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions);
+
+    assertThat(connectionFactory, instanceOf(KerberizedConnectionFactoryDecorator.class));
+  }
+
+  @Test
+  public void testCreateJdbcConnectionFactoryWithKerberosConfiguredInitializesDecoratorProperly() throws Exception {
+    when(sqoopOptions.getHs2Keytab()).thenReturn(TEST_HS2_KEYTAB);
+
+    KerberizedConnectionFactoryDecorator connectionFactory = (KerberizedConnectionFactoryDecorator) connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions);
+
+    softly.assertThat(connectionFactory.getDecorated()).isInstanceOf(HiveServer2ConnectionFactory.class);
+    softly.assertThat(connectionFactory.getAuthenticator().getConfiguration()).isSameAs(configuration);
+    softly.assertThat(connectionFactory.getAuthenticator().getPrincipal()).isEqualTo(TEST_HS2_USER);
+    softly.assertThat(connectionFactory.getAuthenticator().getKeytabLocation()).isEqualTo(TEST_HS2_KEYTAB);
+
+    softly.assertAll();
+  }
+
+}
\ No newline at end of file
diff --git a/src/test/org/apache/sqoop/hive/TestHiveClientFactory.java b/src/test/org/apache/sqoop/hive/TestHiveClientFactory.java
new file mode 100644 (file)
index 0000000..a3c2dc9
--- /dev/null
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.db.JdbcConnectionFactory;
+import org.apache.sqoop.manager.ConnManager;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestHiveClientFactory {
+
+  private static final String TEST_HS2_URL = "jdbc:hive2://myhost:10000/default";
+
+  private static final String TEST_TABLE_NAME = "testTableName";
+
+  private static final String TEST_HIVE_TABLE_NAME = "testHiveTableName";
+
+  private HiveClientFactory hiveClientFactory;
+
+  private ConnManager connectionManager;
+
+  private SqoopOptions sqoopOptions;
+
+  private Configuration configuration;
+
+  private JdbcConnectionFactory jdbcConnectionFactory;
+
+  private HiveServer2ConnectionFactoryInitializer connectionFactoryInitializer;
+
+  private SoftAssertions softly;
+
+  @Before
+  public void before() {
+    connectionFactoryInitializer = mock(HiveServer2ConnectionFactoryInitializer.class);
+    hiveClientFactory = new HiveClientFactory(connectionFactoryInitializer);
+    softly = new SoftAssertions();
+
+    connectionManager = mock(ConnManager.class);
+    sqoopOptions = mock(SqoopOptions.class);
+    configuration = mock(Configuration.class);
+    jdbcConnectionFactory = mock(JdbcConnectionFactory.class);
+
+    when(sqoopOptions.getConf()).thenReturn(configuration);
+    when(sqoopOptions.getTableName()).thenReturn(TEST_TABLE_NAME);
+    when(sqoopOptions.getHiveTableName()).thenReturn(TEST_HIVE_TABLE_NAME);
+  }
+
+  @Test
+  public void testCreateHiveClientCreatesHiveImportWhenHs2UrlIsNotProvided() throws Exception {
+    HiveClient hiveClient = hiveClientFactory.createHiveClient(sqoopOptions, connectionManager);
+    assertThat(hiveClient, instanceOf(HiveImport.class));
+  }
+
+  @Test
+  public void testCreateHiveClientInitializesHiveImportProperly() throws Exception {
+    HiveImport hiveImport = (HiveImport) hiveClientFactory.createHiveClient(sqoopOptions, connectionManager);
+
+    softly.assertThat(hiveImport.getOptions()).isSameAs(sqoopOptions);
+    softly.assertThat(hiveImport.getConnManager()).isSameAs(connectionManager);
+    softly.assertThat(hiveImport.getConfiguration()).isSameAs(configuration);
+    softly.assertThat(hiveImport.isGenerateOnly()).isFalse();
+    softly.assertAll();
+  }
+
+  @Test
+  public void testCreateHiveClientCreatesHiveServer2ClientWhenHs2UrlIsProvided() throws Exception {
+    when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL);
+    HiveClient hiveClient = hiveClientFactory.createHiveClient(sqoopOptions, connectionManager);
+    assertThat(hiveClient, instanceOf(HiveServer2Client.class));
+  }
+
+  @Test
+  public void testCreateHiveClientInitializesHiveServer2ClientProperly() throws Exception {
+    when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL);
+    when(connectionFactoryInitializer.createJdbcConnectionFactory(sqoopOptions)).thenReturn(jdbcConnectionFactory);
+
+    HiveServer2Client hs2Client = (HiveServer2Client) hiveClientFactory.createHiveClient(sqoopOptions, connectionManager);
+
+    softly.assertThat(hs2Client.getSqoopOptions()).isSameAs(sqoopOptions);
+    softly.assertThat(hs2Client.getHs2ConnectionFactory()).isSameAs(jdbcConnectionFactory);
+    softly.assertThat(hs2Client.getTableDefWriter().getOptions()).isSameAs(sqoopOptions);
+    softly.assertThat(hs2Client.getTableDefWriter().getConnManager()).isSameAs(connectionManager);
+    softly.assertThat(hs2Client.getTableDefWriter().getInputTableName()).isEqualTo(TEST_TABLE_NAME);
+    softly.assertThat(hs2Client.getTableDefWriter().getOutputTableName()).isEqualTo(TEST_HIVE_TABLE_NAME);
+    softly.assertThat(hs2Client.getTableDefWriter().getConfiguration()).isSameAs(configuration);
+    softly.assertThat(hs2Client.getTableDefWriter().isCommentsEnabled()).isFalse();
+
+    softly.assertAll();
+  }
+
+}
\ No newline at end of file
diff --git a/src/test/org/apache/sqoop/hive/TestHiveMiniCluster.java b/src/test/org/apache/sqoop/hive/TestHiveMiniCluster.java
new file mode 100644 (file)
index 0000000..419f888
--- /dev/null
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.sqoop.db.JdbcConnectionFactory;
+import org.apache.sqoop.hive.minicluster.AuthenticationConfiguration;
+import org.apache.sqoop.hive.minicluster.HiveMiniCluster;
+import org.apache.sqoop.hive.minicluster.KerberosAuthenticationConfiguration;
+import org.apache.sqoop.hive.minicluster.NoAuthenticationConfiguration;
+import org.apache.sqoop.hive.minicluster.PasswordAuthenticationConfiguration;
+import org.apache.sqoop.infrastructure.kerberos.MiniKdcInfrastructureRule;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class TestHiveMiniCluster {
+
+  @ClassRule
+  public static MiniKdcInfrastructureRule miniKdcInfrastructure = new MiniKdcInfrastructureRule();
+
+  private static final String TEST_USERNAME = "sqoop";
+
+  private static final String TEST_PASSWORD = "secret";
+
+  @Parameters(name = "config = {0}")
+  public static Iterable<? extends Object> authenticationParameters() {
+    return Arrays.asList(new NoAuthenticationConfiguration(),
+                         new PasswordAuthenticationConfiguration(TEST_USERNAME, TEST_PASSWORD),
+                         new KerberosAuthenticationConfiguration(miniKdcInfrastructure));
+  }
+
+  private static final String CREATE_TABLE_SQL = "CREATE TABLE TestTable (id int)";
+
+  private static final String INSERT_SQL = "INSERT INTO TestTable VALUES (?)";
+
+  private static final String SELECT_SQL = "SELECT * FROM TestTable";
+
+  private static final int TEST_VALUE = 10;
+
+  private final AuthenticationConfiguration authenticationConfiguration;
+
+  private HiveMiniCluster hiveMiniCluster;
+
+  private JdbcConnectionFactory connectionFactory;
+
+  public TestHiveMiniCluster(AuthenticationConfiguration authenticationConfiguration) {
+    this.authenticationConfiguration = authenticationConfiguration;
+  }
+
+  @Before
+  public void before() throws SQLException {
+    hiveMiniCluster = new HiveMiniCluster(authenticationConfiguration);
+    hiveMiniCluster.start();
+
+    connectionFactory = authenticationConfiguration.decorateConnectionFactory(new HiveServer2ConnectionFactory(hiveMiniCluster.getUrl(), TEST_USERNAME, TEST_PASSWORD));
+  }
+
+  @Test
+  public void testInsertedRowCanBeReadFromTable() throws Exception {
+    createTestTable();
+    insertRowIntoTestTable();
+
+    assertEquals(TEST_VALUE, getDataFromTestTable());
+  }
+
+  private void insertRowIntoTestTable() throws SQLException {
+    try (Connection conn = connectionFactory.createConnection(); PreparedStatement stmnt = conn.prepareStatement(INSERT_SQL)) {
+      stmnt.setInt(1, TEST_VALUE);
+      stmnt.executeUpdate();
+    }
+  }
+
+  private int getDataFromTestTable() throws SQLException {
+    try (Connection conn = connectionFactory.createConnection(); PreparedStatement stmnt = conn.prepareStatement(SELECT_SQL)) {
+      ResultSet resultSet = stmnt.executeQuery();
+      resultSet.next();
+      return resultSet.getInt(1);
+    }
+  }
+
+  private void createTestTable() throws SQLException {
+    try (Connection conn = connectionFactory.createConnection(); PreparedStatement stmnt = conn.prepareStatement(CREATE_TABLE_SQL)) {
+      stmnt.executeUpdate();
+    }
+  }
+
+  @After
+  public void after() {
+    hiveMiniCluster.stop();
+    UserGroupInformation.setConfiguration(new Configuration());
+  }
+
+}
diff --git a/src/test/org/apache/sqoop/hive/TestHiveServer2Client.java b/src/test/org/apache/sqoop/hive/TestHiveServer2Client.java
new file mode 100644 (file)
index 0000000..0261729
--- /dev/null
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.db.JdbcConnectionFactory;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestHiveServer2Client {
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private static final String CREATE_TABLE_STATEMENT = "createTableStatement";
+
+  private static final String LOAD_DATA_STATEMENT = "loadDataStatement";
+
+  private static final List<String> TEST_COMMANDS = asList("command1", "command2", "command3");
+
+  private HiveServer2Client hs2Client;
+
+  private HiveServer2Client hs2ClientSpy;
+
+  private SqoopOptions sqoopOptions;
+
+  private TableDefWriter tableDefWriter;
+
+  private JdbcConnectionFactory hs2ConnectionFactory;
+
+  private Connection hs2Connection;
+
+  private PreparedStatement preparedStatement;
+
+  private HiveClientCommon hiveClientCommon;
+
+  private Path finalPath;
+
+  private Configuration configuration;
+
+  @Before
+  public void before() throws Exception {
+    sqoopOptions = mock(SqoopOptions.class);
+    tableDefWriter = mock(TableDefWriter.class);
+    hs2ConnectionFactory = mock(JdbcConnectionFactory.class);
+    hs2Connection = mock(Connection.class);
+    preparedStatement = mock(PreparedStatement.class);
+    hiveClientCommon = mock(HiveClientCommon.class);
+    finalPath = mock(Path.class);
+    configuration = mock(Configuration.class);
+
+    when(sqoopOptions.getConf()).thenReturn(configuration);
+    when(hs2ConnectionFactory.createConnection()).thenReturn(hs2Connection);
+    when(hs2Connection.prepareStatement(anyString())).thenReturn(preparedStatement);
+
+    when(tableDefWriter.getCreateTableStmt()).thenReturn(CREATE_TABLE_STATEMENT);
+    when(tableDefWriter.getLoadDataStmt()).thenReturn(LOAD_DATA_STATEMENT);
+    when(tableDefWriter.getFinalPath()).thenReturn(finalPath);
+
+    hs2Client = new HiveServer2Client(sqoopOptions, tableDefWriter, hs2ConnectionFactory, hiveClientCommon);
+    hs2ClientSpy = spy(hs2Client);
+  }
+
+  @Test
+  public void testImportTableExecutesHiveImportWithCreateTableAndLoadDataCommands() throws Exception {
+    doNothing().when(hs2ClientSpy).executeHiveImport(anyList());
+
+    hs2ClientSpy.importTable();
+
+    verify(hs2ClientSpy, times(1)).executeHiveImport(asList(CREATE_TABLE_STATEMENT, LOAD_DATA_STATEMENT));
+  }
+
+  @Test
+  public void testCreateTableExecutesHiveImportWithCreateTableCommandOnly() throws Exception {
+    doNothing().when(hs2ClientSpy).executeHiveImport(anyList());
+
+    hs2ClientSpy.createTable();
+
+    verify(hs2ClientSpy, times(1)).executeHiveImport(asList(CREATE_TABLE_STATEMENT));
+  }
+
+  @Test
+  public void testExecuteHiveImportInvokesMethodsInCorrectSequence() throws Exception {
+    InOrder inOrder = Mockito.inOrder(hiveClientCommon, hs2ClientSpy);
+    doNothing().when(hs2ClientSpy).executeCommands(TEST_COMMANDS);
+
+    hs2ClientSpy.executeHiveImport(TEST_COMMANDS);
+
+    inOrder.verify(hiveClientCommon).removeTempLogs(configuration, finalPath);
+    inOrder.verify(hiveClientCommon).indexLzoFiles(sqoopOptions, finalPath);
+    inOrder.verify(hs2ClientSpy).executeCommands(TEST_COMMANDS);
+    inOrder.verify(hiveClientCommon).cleanUp(configuration, finalPath);
+  }
+
+  @Test
+  public void testExecuteHiveImportThrowsRuntimeExceptionWhenExecuteCommandsThrows() throws Exception {
+    SQLException sqlException = mock(SQLException.class);
+    doThrow(sqlException).when(hs2ClientSpy).executeCommands(anyList());
+
+    expectedException.expect(RuntimeException.class);
+    expectedException.expectMessage("Error executing Hive import.");
+
+    hs2ClientSpy.executeHiveImport(TEST_COMMANDS);
+  }
+
+  @Test
+  public void testExecuteCommandsCreatesExactlyOneConnection() throws Exception {
+    hs2Client.executeCommands(TEST_COMMANDS);
+
+    verify(hs2ConnectionFactory, times(1)).createConnection();
+  }
+
+  @Test
+  public void testExecuteCommandsClosesConnectionWhenStatementExecutionIsSuccessful() throws Exception {
+    hs2Client.executeCommands(TEST_COMMANDS);
+
+    verify(hs2Connection).close();
+  }
+
+  @Test
+  public void testExecuteCommandsClosesConnectionWhenStatementExecutionThrows() throws Exception {
+    when(hs2Connection.prepareStatement(anyString())).thenThrow(new SQLException());
+
+    expectedException.expect(SQLException.class);
+    hs2Client.executeCommands(TEST_COMMANDS);
+
+    verify(hs2Connection).close();
+  }
+
+  @Test
+  public void testExecuteCommandsClosesPreparedStatementsWhenStatementExecutionIsSuccessful() throws Exception {
+    hs2Client.executeCommands(TEST_COMMANDS);
+
+    verify(preparedStatement, times(TEST_COMMANDS.size())).close();
+  }
+
+  @Test
+  public void testExecuteCommandsClosesPreparedStatementWhenStatementExecutionThrows() throws Exception {
+    when(preparedStatement.execute()).thenThrow(new SQLException());
+
+    expectedException.expect(SQLException.class);
+    hs2Client.executeCommands(TEST_COMMANDS);
+
+    verify(preparedStatement).close();
+  }
+
+  @Test
+  public void testExecuteCommandsThrowsWhenCreateConnectionThrows() throws Exception {
+    RuntimeException expected = mock(RuntimeException.class);
+    when(hs2ConnectionFactory.createConnection()).thenThrow(expected);
+
+    expectedException.expect(equalTo(expected));
+    hs2Client.executeCommands(TEST_COMMANDS);
+  }
+
+  @Test
+  public void testExecuteCommandsThrowsWhenPrepareStatementThrows() throws Exception {
+    SQLException expected = mock(SQLException.class);
+    when(hs2Connection.prepareStatement(anyString())).thenThrow(expected);
+
+    expectedException.expect(equalTo(expected));
+    hs2Client.executeCommands(TEST_COMMANDS);
+  }
+
+  @Test
+  public void testExecuteCommandsThrowsWhenExecuteStatementThrows() throws Exception {
+    SQLException expected = mock(SQLException.class);
+    when(preparedStatement.execute()).thenThrow(expected);
+
+    expectedException.expect(equalTo(expected));
+    hs2Client.executeCommands(TEST_COMMANDS);
+  }
+
+}
diff --git a/src/test/org/apache/sqoop/hive/TestHiveServer2TextImport.java b/src/test/org/apache/sqoop/hive/TestHiveServer2TextImport.java
new file mode 100644 (file)
index 0000000..f6d591b
--- /dev/null
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.sqoop.hive.minicluster.HiveMiniCluster;
+import org.apache.sqoop.hive.minicluster.KerberosAuthenticationConfiguration;
+import org.apache.sqoop.infrastructure.kerberos.MiniKdcInfrastructureRule;
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
+import org.apache.sqoop.testutil.HiveServer2TestUtil;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestHiveServer2TextImport extends ImportJobTestCase {
+
+  @ClassRule
+  public static MiniKdcInfrastructureRule miniKdcInfrastructure = new MiniKdcInfrastructureRule();
+
+  private HiveMiniCluster hiveMiniCluster;
+
+  private HiveServer2TestUtil hiveServer2TestUtil;
+
+  @Override
+  @Before
+  public void setUp() {
+    super.setUp();
+    KerberosAuthenticationConfiguration authenticationConfiguration = new KerberosAuthenticationConfiguration(miniKdcInfrastructure);
+    hiveMiniCluster = new HiveMiniCluster(authenticationConfiguration);
+    hiveMiniCluster.start();
+    hiveServer2TestUtil = new HiveServer2TestUtil(hiveMiniCluster.getUrl());
+  }
+
+  @Override
+  @After
+  public void tearDown() {
+    super.tearDown();
+    hiveMiniCluster.stop();
+  }
+
+  @Test
+  public void testImport() throws Exception {
+    List<Object> columnValues = Arrays.<Object>asList("test", 42, "somestring");
+
+    String[] types = {"VARCHAR(32)", "INTEGER", "CHAR(64)"};
+    createTableWithColTypes(types, toStringArray(columnValues));
+
+    String[] args = new ArgumentArrayBuilder()
+        .withProperty(YarnConfiguration.RM_PRINCIPAL, miniKdcInfrastructure.getTestPrincipal())
+        .withOption("connect", getConnectString())
+        .withOption("table", getTableName())
+        .withOption("hive-import")
+        .withOption("hs2-url", hiveMiniCluster.getUrl())
+        .withOption("split-by", getColName(1))
+        .build();
+
+    runImport(args);
+
+    List<List<Object>> rows = hiveServer2TestUtil.loadRawRowsFromTable(getTableName());
+    assertEquals(columnValues, rows.get(0));
+  }
+
+  private String[] toStringArray(List<Object> columnValues) {
+    String[] result = new String[columnValues.size()];
+
+    for (int i = 0; i < columnValues.size(); i++) {
+      if (columnValues.get(i) instanceof String) {
+        result[i] = StringUtils.wrap((String) columnValues.get(i), '\'');
+      } else {
+        result[i] = columnValues.get(i).toString();
+      }
+    }
+
+    return result;
+  }
+
+}
index 8bdc3be..3ea61f6 100644 (file)
@@ -42,6 +42,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 
@@ -248,6 +249,13 @@ public class TestTableDefWriter {
     assertTrue(createTable.contains("`db`.`outputTable`"));
   }
 
+  @Test
+  public void testGetCreateTableStmtDiscardsConnection() throws Exception {
+    writer.getCreateTableStmt();
+
+    verify(connManager).discardConnection(true);
+  }
+
   private void setUpMockConnManager(String tableName, Map<String, Integer> typeMap) {
     when(connManager.getColumnTypes(tableName)).thenReturn(typeMap);
     when(connManager.getColumnNames(tableName)).thenReturn(typeMap.keySet().toArray(new String[]{}));
diff --git a/src/test/org/apache/sqoop/hive/minicluster/AuthenticationConfiguration.java b/src/test/org/apache/sqoop/hive/minicluster/AuthenticationConfiguration.java
new file mode 100644 (file)
index 0000000..4171337
--- /dev/null
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive.minicluster;
+
+import org.apache.sqoop.db.JdbcConnectionFactory;
+
+import java.security.PrivilegedAction;
+import java.util.Map;
+
+public interface AuthenticationConfiguration {
+
+  Map<String, String> getAuthenticationConfig();
+
+  String getUrlParams();
+
+  <T> T doAsAuthenticated(PrivilegedAction<T> action);
+
+  void init();
+
+  JdbcConnectionFactory decorateConnectionFactory(JdbcConnectionFactory connectionFactory);
+
+}
diff --git a/src/test/org/apache/sqoop/hive/minicluster/HiveMiniCluster.java b/src/test/org/apache/sqoop/hive/minicluster/HiveMiniCluster.java
new file mode 100644 (file)
index 0000000..19bb760
--- /dev/null
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive.minicluster;
+
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.Service;
+import org.apache.hive.service.server.HiveServer2;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+public class HiveMiniCluster {
+
+  private static final Log LOG = LogFactory.getLog(HiveMiniCluster.class.getName());
+
+  private static final String DEFAULT_HOST = "127.0.0.1";
+
+  private static final int DEFAULT_PORT = 10000;
+
+  private final String hostName;
+
+  private final int port;
+
+  private final String tempFolderPath;
+
+  private final AuthenticationConfiguration authenticationConfiguration;
+
+  private final HiveServer2 hiveServer2;
+
+  private HiveConf config;
+
+  public HiveMiniCluster(AuthenticationConfiguration authenticationConfiguration) {
+    this(DEFAULT_HOST, DEFAULT_PORT, authenticationConfiguration);
+  }
+
+  public HiveMiniCluster(String hostname, int port, AuthenticationConfiguration authenticationConfiguration) {
+    this(hostname, port, Files.createTempDir().getAbsolutePath(), authenticationConfiguration);
+  }
+
+  public HiveMiniCluster(String hostname, int port, String tempFolderPath, AuthenticationConfiguration authenticationConfiguration) {
+    this.hostName = hostname;
+    this.port = port;
+    this.tempFolderPath = tempFolderPath;
+    this.authenticationConfiguration = authenticationConfiguration;
+    this.hiveServer2 = new HiveServer2();
+  }
+
+  private void createHiveConf() {
+    config = new HiveConf();
+    config.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, tempFolderPath);
+    config.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, getHostName());
+    config.setInt(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, getPort());
+    config.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, getMetastoreConnectUrl());
+
+    for (Map.Entry<String, String> authConfig : authenticationConfiguration.getAuthenticationConfig().entrySet()) {
+      config.set(authConfig.getKey(), authConfig.getValue());
+    }
+  }
+
+  public void start() {
+    try {
+      authenticationConfiguration.init();
+      createHiveConf();
+      createHiveSiteXml();
+      startHiveServer();
+      waitForStartUp();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void createHiveSiteXml() throws IOException {
+    File hiveSiteXmlFile = new File(tempFolderPath,"hive-site.xml");
+    try (OutputStream out = new FileOutputStream(hiveSiteXmlFile)) {
+      config.writeXml(out);
+    }
+
+    HiveConf.setHiveSiteLocation(hiveSiteXmlFile.toURI().toURL());
+  }
+
+  private void startHiveServer() throws Exception {
+    authenticationConfiguration.doAsAuthenticated(new PrivilegedAction<Void>() {
+      @Override
+      public Void run() {
+        hiveServer2.init(config);
+        hiveServer2.start();
+        return null;
+      }
+    });
+  }
+
+  public void stop() {
+    hiveServer2.stop();
+    HiveConf.setHiveSiteLocation(null);
+    try {
+      FileUtils.deleteDirectory(new File(tempFolderPath));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public HiveConf getConfig() {
+    return config;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public String getHostName() {
+    return hostName;
+  }
+
+  public String getUrl() {
+    return String.format("jdbc:hive2://%s:%d/default%s", hostName, port, authenticationConfiguration.getUrlParams());
+  }
+
+  public String getTempFolderPath() {
+    return tempFolderPath;
+  }
+
+  public String getMetastoreConnectUrl() {
+    return String.format("jdbc:derby:;databaseName=%s/minicluster_metastore_db;create=true", tempFolderPath);
+  }
+
+  public boolean isStarted() {
+    return hiveServer2.getServiceState() == Service.STATE.STARTED;
+  }
+
+  private void waitForStartUp() throws InterruptedException, TimeoutException {
+    final int numberOfAttempts = 500;
+    final long sleepTime = 100;
+    for (int i = 0; i < numberOfAttempts; ++i) {
+      try {
+        LOG.debug("Attempt " + (i + 1) + " to access " + hostName + ":" + port);
+        new Socket(InetAddress.getByName(hostName), port).close();
+        return;
+      } catch (RuntimeException | IOException e) {
+        LOG.debug("Failed to connect to " + hostName + ":" + port, e);
+      }
+
+      Thread.sleep(sleepTime);
+    }
+
+    throw new RuntimeException("Couldn't access new server: " + hostName + ":" + port);
+  }
+
+}
diff --git a/src/test/org/apache/sqoop/hive/minicluster/KerberosAuthenticationConfiguration.java b/src/test/org/apache/sqoop/hive/minicluster/KerberosAuthenticationConfiguration.java
new file mode 100644 (file)
index 0000000..549a8c6
--- /dev/null
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive.minicluster;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.sqoop.authentication.KerberosAuthenticator;
+import org.apache.sqoop.db.JdbcConnectionFactory;
+import org.apache.sqoop.db.decorator.KerberizedConnectionFactoryDecorator;
+import org.apache.sqoop.infrastructure.kerberos.KerberosConfigurationProvider;
+
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+
+public class KerberosAuthenticationConfiguration implements AuthenticationConfiguration {
+
+  private final KerberosConfigurationProvider kerberosConfig;
+
+  private KerberosAuthenticator authenticator;
+
+  public KerberosAuthenticationConfiguration(KerberosConfigurationProvider kerberosConfig) {
+    this.kerberosConfig = kerberosConfig;
+  }
+
+  @Override
+  public Map<String, String> getAuthenticationConfig() {
+    Map<String, String> result = new HashMap<>();
+
+    result.put(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname, kerberosConfig.getTestPrincipal());
+    result.put(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB.varname, kerberosConfig.getKeytabFilePath());
+    result.put(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, HiveAuthFactory.AuthTypes.KERBEROS.toString());
+    result.put(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, HiveAuthFactory.AuthTypes.KERBEROS.toString());
+    result.put(YarnConfiguration.RM_PRINCIPAL, kerberosConfig.getTestPrincipal());
+
+    return result;
+  }
+
+  @Override
+  public String getUrlParams() {
+    return ";principal=" + kerberosConfig.getTestPrincipal();
+  }
+
+  @Override
+  public <T> T doAsAuthenticated(PrivilegedAction<T> action) {
+    return authenticator.authenticate().doAs(action);
+  }
+
+  @Override
+  public void init() {
+    authenticator = createKerberosAuthenticator();
+  }
+
+  @Override
+  public JdbcConnectionFactory decorateConnectionFactory(JdbcConnectionFactory connectionFactory) {
+    return new KerberizedConnectionFactoryDecorator(connectionFactory, authenticator);
+  }
+
+  private KerberosAuthenticator createKerberosAuthenticator() {
+    Configuration conf = new Configuration();
+    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    KerberosAuthenticator result = new KerberosAuthenticator(conf, kerberosConfig.getTestPrincipal(), kerberosConfig.getKeytabFilePath());
+    return result;
+  }
+
+}
diff --git a/src/test/org/apache/sqoop/hive/minicluster/NoAuthenticationConfiguration.java b/src/test/org/apache/sqoop/hive/minicluster/NoAuthenticationConfiguration.java
new file mode 100644 (file)
index 0000000..20502c9
--- /dev/null
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive.minicluster;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.sqoop.db.JdbcConnectionFactory;
+
+import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.Map;
+
+public class NoAuthenticationConfiguration implements AuthenticationConfiguration {
+  @Override
+  public Map<String, String> getAuthenticationConfig() {
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public String getUrlParams() {
+    return StringUtils.EMPTY;
+  }
+
+  @Override
+  public <T> T doAsAuthenticated(PrivilegedAction<T> action) {
+    return action.run();
+  }
+
+  @Override
+  public void init() {
+    // do nothing
+  }
+
+  @Override
+  public JdbcConnectionFactory decorateConnectionFactory(JdbcConnectionFactory connectionFactory) {
+    return connectionFactory;
+  }
+}
diff --git a/src/test/org/apache/sqoop/hive/minicluster/PasswordAuthenticationConfiguration.java b/src/test/org/apache/sqoop/hive/minicluster/PasswordAuthenticationConfiguration.java
new file mode 100644 (file)
index 0000000..79881f7
--- /dev/null
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive.minicluster;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.auth.PasswdAuthenticationProvider;
+import org.apache.sqoop.db.JdbcConnectionFactory;
+
+import javax.security.sasl.AuthenticationException;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_CUSTOM_AUTHENTICATION_CLASS;
+
+public class PasswordAuthenticationConfiguration implements AuthenticationConfiguration {
+
+  private static String TEST_USERNAME;
+
+  private static String TEST_PASSWORD;
+
+  private static final class TestPasswordAuthenticationProvider implements PasswdAuthenticationProvider {
+
+    @Override
+    public void Authenticate(String user, String password) throws AuthenticationException {
+      if (!(TEST_USERNAME.equals(user) && TEST_PASSWORD.equals(password))) {
+        throw new AuthenticationException("Authentication failed!");
+      }
+    }
+  }
+
+  public PasswordAuthenticationConfiguration(String testUsername, String testPassword) {
+    TEST_USERNAME = testUsername;
+    TEST_PASSWORD = testPassword;
+  }
+
+  @Override
+  public Map<String, String> getAuthenticationConfig() {
+    Map<String, String> result = new HashMap<>();
+    result.put(HIVE_SERVER2_AUTHENTICATION.varname, HiveAuthFactory.AuthTypes.CUSTOM.getAuthName());
+    result.put(HIVE_SERVER2_CUSTOM_AUTHENTICATION_CLASS.varname, TestPasswordAuthenticationProvider.class.getName());
+
+    return result;
+  }
+
+  @Override
+  public String getUrlParams() {
+    return StringUtils.EMPTY;
+  }
+
+  @Override
+  public <T> T doAsAuthenticated(PrivilegedAction<T> action) {
+    return action.run();
+  }
+
+  @Override
+  public void init() {
+    //do nothing
+  }
+
+  @Override
+  public JdbcConnectionFactory decorateConnectionFactory(JdbcConnectionFactory connectionFactory) {
+    return connectionFactory;
+  }
+}
diff --git a/src/test/org/apache/sqoop/testutil/HiveServer2TestUtil.java b/src/test/org/apache/sqoop/testutil/HiveServer2TestUtil.java
new file mode 100644 (file)
index 0000000..7993708
--- /dev/null
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.testutil;
+
+import org.apache.sqoop.hive.HiveServer2ConnectionFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+public class HiveServer2TestUtil {
+
+  private static final String SELECT_TABLE_QUERY = "SELECT * FROM %s";
+
+  private HiveServer2ConnectionFactory hs2ConnectionFactory;
+
+  public HiveServer2TestUtil(String url) {
+    this(url, null, null);
+  }
+
+  public HiveServer2TestUtil(String url, String username, String password) {
+    hs2ConnectionFactory = new HiveServer2ConnectionFactory(url, username, password);
+  }
+
+  public List<LinkedHashMap<String, Object>> loadRowsFromTable(String tableName) {
+    List<LinkedHashMap<String, Object>> result = new ArrayList<>();
+    try(Connection connection = hs2ConnectionFactory.createConnection();
+        PreparedStatement query = connection.prepareStatement(String.format(SELECT_TABLE_QUERY, tableName))) {
+
+      ResultSet resultSet = query.executeQuery();
+      ResultSetMetaData metaData = resultSet.getMetaData();
+
+      while (resultSet.next()) {
+        LinkedHashMap<String, Object> row = new LinkedHashMap<>();
+        for (int i = 1; i <= metaData.getColumnCount(); i++) {
+          row.put(metaData.getColumnName(i), resultSet.getObject(i));
+        }
+        result.add(row);
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+    return result;
+  }
+
+  public List<List<Object>> loadRawRowsFromTable(String tableName) {
+    List<List<Object>> result = new ArrayList<>();
+    List<LinkedHashMap<String, Object>> rowsWithColumnNames = loadRowsFromTable(tableName);
+
+    for (LinkedHashMap<String, Object> rowWithColumnNames : rowsWithColumnNames) {
+      result.add(new ArrayList<>(rowWithColumnNames.values()));
+    }
+
+    return result;
+  }
+
+}
diff --git a/src/test/org/apache/sqoop/tool/TestHiveServer2OptionValidations.java b/src/test/org/apache/sqoop/tool/TestHiveServer2OptionValidations.java
new file mode 100644 (file)
index 0000000..4d3f938
--- /dev/null
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.tool;
+
+import org.apache.sqoop.SqoopOptions;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+import static org.apache.sqoop.SqoopOptions.FileLayout.ParquetFile;
+import static org.apache.sqoop.SqoopOptions.IncrementalMode.None;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(Parameterized.class)
+public class TestHiveServer2OptionValidations {
+
+  @Parameters(name = "sqoopTool = {0}")
+  public static Iterable<? extends Object> parameters() {
+    return Arrays.asList(
+        new ImportTool(),
+        new ImportAllTablesTool(),
+        new CreateHiveTableTool());
+  }
+
+  private static final String TEST_HS2_URL = "test-hs2-url";
+  private static final String TEST_HS2_USER = "test-hs2-user";
+  private static final String TEST_HS2_KEYTAB = "test-hs2-keytab";
+  private static final String TEST_TABLE = "testtable";
+  private static final String TEST_CONNECTION_STRING = "testconnectstring";
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private final BaseSqoopTool sqoopTool;
+
+  private SqoopOptions sqoopOptions;
+
+  public TestHiveServer2OptionValidations(BaseSqoopTool sqoopTool) {
+    this.sqoopTool = spy(sqoopTool);
+  }
+
+  @Before
+  public void before() {
+    sqoopOptions = mock(SqoopOptions.class);
+    when(sqoopOptions.getTableName()).thenReturn(TEST_TABLE);
+    when(sqoopOptions.getIncrementalMode()).thenReturn(None);
+    when(sqoopOptions.getConnectString()).thenReturn(TEST_CONNECTION_STRING);
+    when(sqoopOptions.getMapColumnHive()).thenReturn(new Properties());
+
+
+    doReturn(0).when(sqoopTool).getDashPosition(any(String[].class));
+  }
+
+  @Test
+  public void testValidateOptionsThrowsWhenHs2UrlIsUsedWithoutHiveImport() throws Exception {
+    expectedException.expect(SqoopOptions.InvalidOptionsException.class);
+    expectedException.expectMessage("The hs2-url option cannot be used without the hive-import option.");
+
+    when(sqoopOptions.doHiveImport()).thenReturn(false);
+    when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL);
+
+    sqoopTool.validateOptions(sqoopOptions);
+  }
+
+  @Test
+  public void testValidateOptionsThrowsWhenHs2UrlIsUsedWithHCatalogImport() throws Exception {
+    expectedException.expect(SqoopOptions.InvalidOptionsException.class);
+    expectedException.expectMessage("The hs2-url option cannot be used without the hive-import option.");
+
+    when(sqoopOptions.getHCatTableName()).thenReturn(TEST_TABLE);
+    when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL);
+
+    sqoopTool.validateOptions(sqoopOptions);
+  }
+
+  @Test
+  public void testValidateOptionsThrowsWhenHs2UserIsUsedWithoutHs2Url() throws Exception {
+    expectedException.expect(SqoopOptions.InvalidOptionsException.class);
+    expectedException.expectMessage("The hs2-user option cannot be used without the hs2-url option.");
+
+    when(sqoopOptions.getHs2User()).thenReturn(TEST_HS2_USER);
+
+    sqoopTool.validateOptions(sqoopOptions);
+  }
+
+  @Test
+  public void testValidateOptionsThrowsWhenHs2KeytabIsUsedWithoutHs2User() throws Exception {
+    expectedException.expect(SqoopOptions.InvalidOptionsException.class);
+    expectedException.expectMessage("The hs2-keytab option cannot be used without the hs2-user option.");
+
+    when(sqoopOptions.getHs2Keytab()).thenReturn(TEST_HS2_KEYTAB);
+
+    sqoopTool.validateOptions(sqoopOptions);
+  }
+
+  @Test
+  public void testValidateOptionsSucceedsWhenHs2UrlIsUsedWithHiveImport() throws Exception {
+    when(sqoopOptions.doHiveImport()).thenReturn(true);
+    when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL);
+
+    sqoopTool.validateOptions(sqoopOptions);
+  }
+
+  @Test
+  public void testValidateOptionsSucceedsWhenHs2UrlIsUsedWithHiveImportAndHs2UserButWithoutHs2Keytab() throws Exception {
+    when(sqoopOptions.doHiveImport()).thenReturn(true);
+    when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL);
+    when(sqoopOptions.getHs2User()).thenReturn(TEST_HS2_URL);
+
+    sqoopTool.validateOptions(sqoopOptions);
+  }
+
+  @Test
+  public void testValidateOptionsFailsWhenHs2UrlIsUsedWithParquetFormat() throws Exception {
+    expectedException.expect(SqoopOptions.InvalidOptionsException.class);
+    expectedException.expectMessage("The hs2-url option cannot be used with the as-parquetfile option.");
+
+    when(sqoopOptions.doHiveImport()).thenReturn(true);
+    when(sqoopOptions.getHs2Url()).thenReturn(TEST_HS2_URL);
+    when(sqoopOptions.getFileLayout()).thenReturn(ParquetFile);
+
+    sqoopTool.validateOptions(sqoopOptions);
+  }
+
+}
index 1c0cf4d..3bdc5c6 100644 (file)
@@ -23,7 +23,6 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -33,10 +32,10 @@ import static org.mockito.Mockito.when;
 import java.sql.Connection;
 
 import org.apache.sqoop.SqoopOptions.InvalidOptionsException;
-import org.apache.sqoop.hive.HiveImport;
 import org.apache.avro.Schema;
 import org.apache.sqoop.SqoopOptions;
 import org.apache.sqoop.avro.AvroSchemaMismatchException;
+import org.apache.sqoop.hive.HiveClientFactory;
 import org.apache.sqoop.util.ExpectedLogMessage;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -75,7 +74,7 @@ public class TestImportTool {
     final String actualSchemaString = "actualSchema";
     final String errorMessage = "Import failed";
 
-    ImportTool importTool = spy(new ImportTool("import", mock(CodeGenTool.class), false));
+    ImportTool importTool = spy(new ImportTool("import", mock(CodeGenTool.class), false, mock(HiveClientFactory.class)));
 
     doReturn(true).when(importTool).init(any(SqoopOptions.class));
 
@@ -85,7 +84,7 @@ public class TestImportTool {
     when(actualSchema.toString()).thenReturn(actualSchemaString);
 
     AvroSchemaMismatchException expectedException = new AvroSchemaMismatchException(errorMessage, writtenWithSchema, actualSchema);
-    doThrow(expectedException).when(importTool).importTable(any(SqoopOptions.class), anyString(), any(HiveImport.class));
+    doThrow(expectedException).when(importTool).importTable(any(SqoopOptions.class));
 
     SqoopOptions sqoopOptions = mock(SqoopOptions.class);
     when(sqoopOptions.doHiveImport()).thenReturn(true);