SQOOP-3376: Test import into external Hive table backed by S3
authorSzabolcs Vasas <vasas@apache.org>
Wed, 10 Oct 2018 12:41:22 +0000 (14:41 +0200)
committerSzabolcs Vasas <vasas@apache.org>
Wed, 10 Oct 2018 12:41:22 +0000 (14:41 +0200)
(Boglarka Egyed via Szabolcs Vasas)

src/test/org/apache/sqoop/s3/TestS3ExternalHiveTableImport.java [new file with mode: 0644]
src/test/org/apache/sqoop/testutil/HiveServer2TestUtil.java
src/test/org/apache/sqoop/testutil/S3TestUtils.java

diff --git a/src/test/org/apache/sqoop/s3/TestS3ExternalHiveTableImport.java b/src/test/org/apache/sqoop/s3/TestS3ExternalHiveTableImport.java
new file mode 100644 (file)
index 0000000..0c3161e
--- /dev/null
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.s3;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.sqoop.hive.minicluster.HiveMiniCluster;
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
+import org.apache.sqoop.testutil.DefaultS3CredentialGenerator;
+import org.apache.sqoop.testutil.HiveServer2TestUtil;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.apache.sqoop.testutil.S3CredentialGenerator;
+import org.apache.sqoop.testutil.S3TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.sqoop.tool.BaseSqoopTool.FMT_PARQUETFILE_ARG;
+import static org.apache.sqoop.tool.BaseSqoopTool.FMT_TEXTFILE_ARG;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class TestS3ExternalHiveTableImport extends ImportJobTestCase {
+
+    @Parameterized.Parameters(name = "fileFormatArg = {0}, expectedResult = {1}")
+    public static Iterable<? extends Object> parameters() {
+        return Arrays.asList(new Object[] {FMT_TEXTFILE_ARG, S3TestUtils.getExpectedTextOutputAsList()},
+                new Object[] {FMT_PARQUETFILE_ARG, S3TestUtils.getExpectedParquetOutput()});
+    }
+
+    public static final Log LOG = LogFactory.getLog(
+            TestS3ExternalHiveTableImport.class.getName());
+
+    private String fileFormatArg;
+
+    private List<String> expectedResult;
+
+    public TestS3ExternalHiveTableImport(String fileFormatArg, List<String> expectedResult) {
+        this.fileFormatArg = fileFormatArg;
+        this.expectedResult = expectedResult;
+    }
+
+    private static S3CredentialGenerator s3CredentialGenerator;
+
+    private FileSystem s3Client;
+
+    private HiveMiniCluster hiveMiniCluster;
+
+    private static HiveServer2TestUtil hiveServer2TestUtil;
+
+    @BeforeClass
+    public static void setupS3Credentials() throws IOException {
+        String generatorCommand = S3TestUtils.getGeneratorCommand();
+        if (generatorCommand != null) {
+            s3CredentialGenerator = new DefaultS3CredentialGenerator(generatorCommand);
+        }
+    }
+
+    @Before
+    public void setup() throws IOException {
+        S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
+        super.setUp();
+        S3TestUtils.createTestTableFromInputData(this);
+        s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
+        hiveMiniCluster = S3TestUtils.setupS3ExternalHiveTableImportTestCase(s3CredentialGenerator);
+        hiveServer2TestUtil = new HiveServer2TestUtil(hiveMiniCluster.getUrl());
+    }
+
+    @After
+    public void cleanUpTargetDir() {
+        S3TestUtils.tearDownS3ExternalHiveTableImportTestCase(s3Client);
+        super.tearDown();
+        if (hiveMiniCluster != null) {
+            hiveMiniCluster.stop();
+        }
+    }
+
+    @Test
+    public void testS3ImportIntoExternalHiveTable() throws IOException {
+        String[] args = getExternalHiveTableImportArgs(false);
+        runImport(args);
+
+        List<String> rows = hiveServer2TestUtil.loadCsvRowsFromTable(getTableName());
+        assertEquals(rows, expectedResult);
+    }
+
+    @Test
+    public void testS3CreateAndImportIntoExternalHiveTable() throws IOException {
+        String[] args = getExternalHiveTableImportArgs(true);
+        runImport(args);
+
+        List<String> rows = hiveServer2TestUtil.loadCsvRowsFromTable(S3TestUtils.HIVE_EXTERNAL_TABLE_NAME);
+        assertEquals(rows, expectedResult);
+    }
+
+    private String[] getExternalHiveTableImportArgs(boolean createHiveTable) {
+        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
+                s3CredentialGenerator, fileFormatArg);
+        builder = S3TestUtils.addExternalHiveTableImportArgs(builder, hiveMiniCluster.getUrl());
+        if(createHiveTable) {
+            builder = S3TestUtils.addCreateHiveTableArgs(builder);
+        }
+        return builder.build();
+    }
+
+}
index 7993708..c0689e6 100644 (file)
@@ -18,6 +18,7 @@
 
 package org.apache.sqoop.testutil;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.sqoop.hive.HiveServer2ConnectionFactory;
 
 import java.sql.Connection;
@@ -29,6 +30,8 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 
+import static java.util.stream.Collectors.toList;
+
 public class HiveServer2TestUtil {
 
   private static final String SELECT_TABLE_QUERY = "SELECT * FROM %s";
@@ -75,4 +78,10 @@ public class HiveServer2TestUtil {
     return result;
   }
 
+  public List<String> loadCsvRowsFromTable(String tableName) {
+    return loadRawRowsFromTable(tableName).stream()
+            .map(list -> StringUtils.join(list, ","))
+            .collect(toList());
+  }
+
 }
index 0e6ef5b..c9d17bc 100644 (file)
@@ -24,6 +24,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.sqoop.hive.minicluster.HiveMiniCluster;
+import org.apache.sqoop.hive.minicluster.NoAuthenticationConfiguration;
 import org.apache.sqoop.util.FileSystemUtil;
 
 import java.io.IOException;
@@ -44,12 +46,16 @@ public class S3TestUtils {
 
     private static final String TEMPORARY_CREDENTIALS_PROVIDER_CLASS = "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider";
 
-    private static final String BUCKET_TEMP_TEST_DIR = "/tmp/sqooptest/";
+    private static final String BUCKET_TEMP_DIR = "/tmp/";
+
+    private static final String EXTERNAL_TABLE_DIR = "/externaldir";
 
     private static final String TARGET_DIR_NAME_PREFIX = "/testdir";
 
     private static final String TEMPORARY_ROOTDIR_SUFFIX = "_temprootdir";
 
+    public static final String HIVE_EXTERNAL_TABLE_NAME = "test_external_table";
+
     private static String targetDirName = TARGET_DIR_NAME_PREFIX;
 
     private static final String[] COLUMN_NAMES = {"ID",  "SUPERHERO", "COMICS", "DEBUT"};
@@ -95,15 +101,20 @@ public class S3TestUtils {
     }
 
     public static Path getTargetDirPath() {
-        String targetPathString = getBucketTempTestDirPath() + getTargetDirName();
+        String targetPathString = getBucketTempDirPath() + getTargetDirName();
         return new Path(targetPathString);
     }
 
-    private static Path getBucketTempTestDirPath() {
-        String targetPathString = getPropertyBucketUrl() + BUCKET_TEMP_TEST_DIR;
+    private static Path getBucketTempDirPath() {
+        String targetPathString = getPropertyBucketUrl() + BUCKET_TEMP_DIR;
         return new Path(targetPathString);
     }
 
+    public static Path getExternalTableDirPath() {
+        String externalTableDir = getBucketTempDirPath() + EXTERNAL_TABLE_DIR;
+        return new Path(externalTableDir);
+    }
+
     public static void runTestCaseOnlyIfS3CredentialsAreSet(S3CredentialGenerator s3CredentialGenerator) {
         assumeNotNull(s3CredentialGenerator);
         assumeNotNull(s3CredentialGenerator.getS3AccessKey());
@@ -112,7 +123,9 @@ public class S3TestUtils {
 
     public static FileSystem setupS3ImportTestCase(S3CredentialGenerator s3CredentialGenerator) throws IOException {
         Configuration hadoopConf = new Configuration();
-        S3TestUtils.setS3CredentialsInHadoopConf(hadoopConf, s3CredentialGenerator);
+        setS3CredentialsInConf(hadoopConf, s3CredentialGenerator);
+        setHadoopConfigParametersForS3UnitTests(hadoopConf);
+
         FileSystem s3Client = FileSystem.get(hadoopConf);
 
         setUniqueTargetDirName();
@@ -122,16 +135,20 @@ public class S3TestUtils {
         return s3Client;
     }
 
-    private static void setS3CredentialsInHadoopConf(Configuration hadoopConf,
-                                                     S3CredentialGenerator s3CredentialGenerator) {
-        hadoopConf.set("fs.defaultFS", getPropertyBucketUrl());
-        hadoopConf.set(Constants.ACCESS_KEY, s3CredentialGenerator.getS3AccessKey());
-        hadoopConf.set(Constants.SECRET_KEY, s3CredentialGenerator.getS3SecretKey());
+    public static void setS3CredentialsInConf(Configuration conf,
+                                              S3CredentialGenerator s3CredentialGenerator) {
+        conf.set(Constants.ACCESS_KEY, s3CredentialGenerator.getS3AccessKey());
+        conf.set(Constants.SECRET_KEY, s3CredentialGenerator.getS3SecretKey());
 
         if (s3CredentialGenerator.getS3SessionToken() != null) {
-            hadoopConf.set(Constants.SESSION_TOKEN, s3CredentialGenerator.getS3SessionToken());
-            hadoopConf.set(Constants.AWS_CREDENTIALS_PROVIDER, TEMPORARY_CREDENTIALS_PROVIDER_CLASS);
+            conf.set(Constants.SESSION_TOKEN, s3CredentialGenerator.getS3SessionToken());
+            conf.set(Constants.AWS_CREDENTIALS_PROVIDER, TEMPORARY_CREDENTIALS_PROVIDER_CLASS);
         }
+    }
+
+    private static void setHadoopConfigParametersForS3UnitTests(Configuration hadoopConf) {
+        // Default filesystem needs to be set to S3 for the output verification phase
+        hadoopConf.set("fs.defaultFS", getPropertyBucketUrl());
 
         // FileSystem has a static cache that should be disabled during tests to make sure
         // Sqoop relies on the S3 credentials set via the -D system properties.
@@ -139,6 +156,14 @@ public class S3TestUtils {
         hadoopConf.setBoolean("fs.s3a.impl.disable.cache", true);
     }
 
+    public static HiveMiniCluster setupS3ExternalHiveTableImportTestCase(S3CredentialGenerator s3CredentialGenerator) {
+        HiveMiniCluster hiveMiniCluster = new HiveMiniCluster(new NoAuthenticationConfiguration());
+        hiveMiniCluster.start();
+        S3TestUtils.setS3CredentialsInConf(hiveMiniCluster.getConfig(), s3CredentialGenerator);
+
+        return hiveMiniCluster;
+    }
+
     public static ArgumentArrayBuilder getArgumentArrayBuilderForS3UnitTests(BaseSqoopTestCase testCase,
                                                                              S3CredentialGenerator s3CredentialGenerator) {
         ArgumentArrayBuilder builder = new ArgumentArrayBuilder();
@@ -169,6 +194,20 @@ public class S3TestUtils {
         return builder.build();
     }
 
+    public static ArgumentArrayBuilder addExternalHiveTableImportArgs(ArgumentArrayBuilder builder,
+                                                                      String hs2Url) {
+        return builder
+                .withOption("hive-import")
+                .withOption("hs2-url", hs2Url)
+                .withOption("external-table-dir", getExternalTableDirPath().toString());
+    }
+
+    public static ArgumentArrayBuilder addCreateHiveTableArgs(ArgumentArrayBuilder builder) {
+        return builder
+                .withOption("create-hive-table")
+                .withOption("hive-table", HIVE_EXTERNAL_TABLE_NAME);
+    }
+
     private static Path getTemporaryRootDirPath() {
         return new Path(getTargetDirPath().toString() + TEMPORARY_ROOTDIR_SUFFIX);
     }
@@ -244,6 +283,10 @@ public class S3TestUtils {
         };
     }
 
+    public static List<String> getExpectedTextOutputAsList() {
+        return Arrays.asList(getExpectedTextOutput());
+    }
+
     public static String[] getExpectedExtraTextOutput() {
         return new String[] {
                 "5,Black Widow,Marvel,1964"
@@ -352,15 +395,23 @@ public class S3TestUtils {
         }
     }
 
-    public static void tearDownS3ImportTestCase(FileSystem s3Client) {
+    private static void cleanUpTargetDir(FileSystem s3Client) {
         cleanUpDirectory(s3Client, getTargetDirPath());
         resetTargetDirName();
     }
 
+    public static void tearDownS3ImportTestCase(FileSystem s3Client) {
+        cleanUpTargetDir(s3Client);
+    }
+
     public static void tearDownS3IncrementalImportTestCase(FileSystem s3Client) {
-        cleanUpDirectory(s3Client, getTargetDirPath());
+        cleanUpTargetDir(s3Client);
         cleanUpDirectory(s3Client, getTemporaryRootDirPath());
-        resetTargetDirName();
         System.clearProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY);
     }
+
+    public static void tearDownS3ExternalHiveTableImportTestCase(FileSystem s3Client) {
+        cleanUpTargetDir(s3Client);
+        cleanUpDirectory(s3Client, getExternalTableDirPath());
+    }
 }