SQOOP-3363: Test incremental import with S3
authorSzabolcs Vasas <vasas@apache.org>
Tue, 28 Aug 2018 12:17:55 +0000 (14:17 +0200)
committerSzabolcs Vasas <vasas@apache.org>
Tue, 28 Aug 2018 12:17:55 +0000 (14:17 +0200)
(Boglarka Egyed via Szabolcs Vasas)

18 files changed:
src/java/org/apache/sqoop/util/AppendUtils.java
src/java/org/apache/sqoop/util/FileSystemUtil.java
src/test/org/apache/sqoop/TestAppendUtils.java
src/test/org/apache/sqoop/s3/TestS3AvroImport.java
src/test/org/apache/sqoop/s3/TestS3IncrementalAppendAvroImport.java [new file with mode: 0644]
src/test/org/apache/sqoop/s3/TestS3IncrementalAppendParquetImport.java [new file with mode: 0644]
src/test/org/apache/sqoop/s3/TestS3IncrementalAppendSequenceFileImport.java [new file with mode: 0644]
src/test/org/apache/sqoop/s3/TestS3IncrementalAppendTextImport.java [new file with mode: 0644]
src/test/org/apache/sqoop/s3/TestS3IncrementalMergeParquetImport.java [new file with mode: 0644]
src/test/org/apache/sqoop/s3/TestS3IncrementalMergeTextImport.java [new file with mode: 0644]
src/test/org/apache/sqoop/s3/TestS3ParquetImport.java [new file with mode: 0644]
src/test/org/apache/sqoop/s3/TestS3SequenceFileImport.java
src/test/org/apache/sqoop/s3/TestS3TextImport.java
src/test/org/apache/sqoop/testutil/AvroTestUtils.java
src/test/org/apache/sqoop/testutil/BaseSqoopTestCase.java
src/test/org/apache/sqoop/testutil/S3TestUtils.java
src/test/org/apache/sqoop/testutil/SequenceFileTestUtils.java
src/test/org/apache/sqoop/testutil/TextFileTestUtils.java

index 20c0d13..96c4fac 100644 (file)
@@ -44,6 +44,7 @@ public class AppendUtils {
   private static final String FILEEXT_SEPARATOR = ".";
 
   public static final String DATA_PART_PATTERN_PREFIX = "part";
+  public static final String MAPREDUCE_OUTPUT_BASENAME_PROPERTY = "mapreduce.output.basename";
 
   private ImportJobContext context = null;
 
@@ -285,7 +286,7 @@ public class AppendUtils {
    * @return Pattern
    */
   private Pattern getDataFileNamePattern() {
-    String prefix = context.getOptions().getConf().get("mapreduce.output.basename");
+    String prefix = context.getOptions().getConf().get(MAPREDUCE_OUTPUT_BASENAME_PROPERTY);
 
     if(null == prefix || prefix.length() == 0) {
       prefix = DATA_PART_PATTERN_PREFIX;
index 96ec212..bd7cbf8 100644 (file)
@@ -62,4 +62,15 @@ public final class FileSystemUtil {
     }
     return result;
   }
+
+  public static List<Path> findFilesWithPathContainingPattern(Path path, Configuration conf, String pattern) throws IOException {
+    List<Path> result = new ArrayList<>();
+    List<Path> filePaths = listFiles(path, conf);
+    for (Path filePath : filePaths) {
+      if (filePath.toString().contains(pattern)) {
+        result.add(filePath);
+      }
+    }
+    return result;
+  }
 }
index 3d66bec..09cfb19 100644 (file)
@@ -43,6 +43,7 @@ import org.apache.sqoop.tool.ImportTool;
 import org.apache.sqoop.util.AppendUtils;
 import org.junit.Test;
 
+import static org.apache.sqoop.util.AppendUtils.MAPREDUCE_OUTPUT_BASENAME_PROPERTY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -330,7 +331,7 @@ public class TestAppendUtils extends ImportJobTestCase {
 
     ArrayList<String> args = new ArrayList<>();
     args.add("-D");
-    args.add("mapreduce.output.basename=" + prefix);
+    args.add(MAPREDUCE_OUTPUT_BASENAME_PROPERTY + "=" + prefix);
     args.addAll(getOutputlessArgv(false, true, HsqldbTestServer.getFieldNames(), getConf()));
     String targetDir = getWarehouseDir() + "/tempTargetDirOutputBaseNameTest";
     args.add("--target-dir");
index e130c42..7f5f5d6 100644 (file)
@@ -60,52 +60,50 @@ public class TestS3AvroImport extends ImportJobTestCase {
     public void setup() throws IOException {
         S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
         super.setUp();
-        s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator, this);
+        S3TestUtils.createTestTableFromInputData(this);
+        s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
     }
 
     @After
-    public void clearOutputDir() throws IOException {
-        S3TestUtils.clearTargetDir(s3Client);
-        S3TestUtils.resetTargetDirName();
+    public void cleanUpTargetDir() {
+        S3TestUtils.tearDownS3ImportTestCase(s3Client);
         super.tearDown();
     }
 
-    protected ArgumentArrayBuilder getArgumentArrayBuilder() {
-        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForUnitTests(this, s3CredentialGenerator);
-        return builder;
-    }
-
-
     @Test
-    public void testImportAsAvroDataFileWithoutDeleteTargetDirOptionWhenTargetDirDoesNotExist() throws IOException {
-        ArgumentArrayBuilder builder = getArgumentArrayBuilder();
-        builder.withOption("as-avrodatafile");
-        String[] args = builder.build();
+    public void testS3ImportAsAvroDataFileWithoutDeleteTargetDirOptionWhenTargetDirDoesNotExist() throws IOException {
+        String[] args = getArgsWithAsAvroDataFileOption();
         runImport(args);
         AvroTestUtils.verify(S3TestUtils.getExpectedAvroOutput(), s3Client.getConf(), S3TestUtils.getTargetDirPath());
     }
 
     @Test
-    public void testImportAsAvroDataFileWithDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
-        ArgumentArrayBuilder builder = getArgumentArrayBuilder();
-        builder.withOption("as-avrodatafile");
-        builder.withOption("delete-target-dir");
-        String[] args = builder.build();
+    public void testS3ImportAsAvroDataFileWithDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
+        String[] args = getArgsWithAsAvroDataFileOption();
         runImport(args);
-        AvroTestUtils.verify(S3TestUtils.getExpectedAvroOutput(), s3Client.getConf(), S3TestUtils.getTargetDirPath());
 
+        args = getArgsWithAsAvroDataFileAndDeleteTargetDirOption();
         runImport(args);
+        AvroTestUtils.verify(S3TestUtils.getExpectedAvroOutput(), s3Client.getConf(), S3TestUtils.getTargetDirPath());
     }
 
     @Test
-    public void testImportAsAvroDataFileWithoutDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
-        ArgumentArrayBuilder builder = getArgumentArrayBuilder();
-        builder.withOption("as-avrodatafile");
-        String[] args = builder.build();
+    public void testS3ImportAsAvroDataFileWithoutDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
+        String[] args = getArgsWithAsAvroDataFileOption();
         runImport(args);
-        AvroTestUtils.verify(S3TestUtils.getExpectedAvroOutput(), s3Client.getConf(), S3TestUtils.getTargetDirPath());
 
         thrown.expect(IOException.class);
         runImport(args);
     }
+
+    private String[] getArgsWithAsAvroDataFileOption() {
+        return S3TestUtils.getArgsForS3UnitTestsWithFileFormatOption(this, s3CredentialGenerator, "as-avrodatafile");
+    }
+
+    private String[] getArgsWithAsAvroDataFileAndDeleteTargetDirOption() {
+        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
+                s3CredentialGenerator,"as-avrodatafile");
+        builder.withOption("delete-target-dir");
+        return builder.build();
+    }
 }
diff --git a/src/test/org/apache/sqoop/s3/TestS3IncrementalAppendAvroImport.java b/src/test/org/apache/sqoop/s3/TestS3IncrementalAppendAvroImport.java
new file mode 100644 (file)
index 0000000..5faf59e
--- /dev/null
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.s3;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
+import org.apache.sqoop.testutil.AvroTestUtils;
+import org.apache.sqoop.testutil.DefaultS3CredentialGenerator;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.apache.sqoop.testutil.S3CredentialGenerator;
+import org.apache.sqoop.testutil.S3TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+
+import static org.apache.sqoop.util.AppendUtils.MAPREDUCE_OUTPUT_BASENAME_PROPERTY;
+
+public class TestS3IncrementalAppendAvroImport extends ImportJobTestCase {
+
+    public static final Log LOG = LogFactory.getLog(
+            TestS3IncrementalAppendAvroImport.class.getName());
+
+    private static S3CredentialGenerator s3CredentialGenerator;
+
+    private FileSystem s3Client;
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @BeforeClass
+    public static void setupS3Credentials() throws IOException {
+        String generatorCommand = S3TestUtils.getGeneratorCommand();
+        if (generatorCommand != null) {
+            s3CredentialGenerator = new DefaultS3CredentialGenerator(generatorCommand);
+        }
+    }
+
+    @Before
+    public void setup() throws IOException {
+        S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
+        super.setUp();
+        S3TestUtils.createTestTableFromInputData(this);
+        s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
+    }
+
+    @After
+    public void cleanUpOutputDirectories() {
+        S3TestUtils.tearDownS3IncrementalImportTestCase(s3Client);
+        super.tearDown();
+    }
+
+    @Test
+    public void testS3IncrementalAppendAsAvroDataFileWhenNoNewRowIsImported() throws IOException {
+        String[] args = getArgsWithAsAvroDataFileOption(false);
+        runImport(args);
+
+        args = getIncrementalAppendArgsWithAsAvroDataFileOption(false);
+        runImport(args);
+
+        S3TestUtils.failIfOutputFilePathContainingPatternExists(s3Client, MAP_OUTPUT_FILE_00001);
+    }
+
+    @Test
+    public void testS3IncrementalAppendAsAvroDataFile() throws IOException {
+        String[] args = getArgsWithAsAvroDataFileOption(false);
+        runImport(args);
+
+        S3TestUtils.insertInputDataIntoTable(this, S3TestUtils.getExtraInputData());
+
+        args = getIncrementalAppendArgsWithAsAvroDataFileOption(false);
+        runImport(args);
+
+        AvroTestUtils.verify(S3TestUtils.getExpectedExtraAvroOutput(), s3Client.getConf(), S3TestUtils.getTargetDirPath(), MAP_OUTPUT_FILE_00001);
+    }
+
+    @Test
+    public void testS3IncrementalAppendAsAvroDataFileWithMapreduceOutputBasenameProperty() throws IOException {
+        String[] args = getArgsWithAsAvroDataFileOption(true);
+        runImport(args);
+
+        S3TestUtils.insertInputDataIntoTable(this, S3TestUtils.getExtraInputData());
+
+        args = getIncrementalAppendArgsWithAsAvroDataFileOption(true);
+        runImport(args);
+
+        AvroTestUtils.verify(S3TestUtils.getExpectedExtraAvroOutput(), s3Client.getConf(), S3TestUtils.getTargetDirPath(), S3TestUtils.CUSTOM_MAP_OUTPUT_FILE_00001);
+    }
+
+    private String[] getArgsWithAsAvroDataFileOption(boolean withMapreduceOutputBasenameProperty) {
+        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
+                s3CredentialGenerator,"as-avrodatafile");
+        if (withMapreduceOutputBasenameProperty) {
+            builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
+        }
+        return builder.build();
+    }
+
+    private String[] getIncrementalAppendArgsWithAsAvroDataFileOption(boolean withMapreduceOutputBasenameProperty) {
+        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
+                s3CredentialGenerator,"as-avrodatafile");
+        builder = S3TestUtils.addIncrementalAppendImportArgs(builder);
+        if (withMapreduceOutputBasenameProperty) {
+            builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
+        }
+        return builder.build();
+    }
+}
diff --git a/src/test/org/apache/sqoop/s3/TestS3IncrementalAppendParquetImport.java b/src/test/org/apache/sqoop/s3/TestS3IncrementalAppendParquetImport.java
new file mode 100644 (file)
index 0000000..a4f9864
--- /dev/null
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.s3;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
+import org.apache.sqoop.testutil.DefaultS3CredentialGenerator;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.apache.sqoop.testutil.S3CredentialGenerator;
+import org.apache.sqoop.testutil.S3TestUtils;
+import org.apache.sqoop.util.ParquetReader;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.sqoop.util.AppendUtils.MAPREDUCE_OUTPUT_BASENAME_PROPERTY;
+import static org.junit.Assert.assertEquals;
+
+public class TestS3IncrementalAppendParquetImport extends ImportJobTestCase {
+
+    public static final Log LOG = LogFactory.getLog(
+            TestS3IncrementalAppendParquetImport.class.getName());
+
+    private static S3CredentialGenerator s3CredentialGenerator;
+
+    private FileSystem s3Client;
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @BeforeClass
+    public static void setupS3Credentials() throws IOException {
+        String generatorCommand = S3TestUtils.getGeneratorCommand();
+        if (generatorCommand != null) {
+            s3CredentialGenerator = new DefaultS3CredentialGenerator(generatorCommand);
+        }
+    }
+
+    @Before
+    public void setup() throws IOException {
+        S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
+        super.setUp();
+        S3TestUtils.createTestTableFromInputData(this);
+        s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
+    }
+
+    @After
+    public void cleanUpOutputDirectories() {
+        S3TestUtils.tearDownS3IncrementalImportTestCase(s3Client);
+        super.tearDown();
+    }
+
+    @Test
+    public void testS3IncrementalAppendAsParquetFileWhenNoNewRowIsImported() throws Exception {
+        String[] args = getArgsWithAsParquetFileOption(false);
+        runImport(args);
+
+        args = getIncrementalAppendArgsWithAsParquetFileOption(false);
+        runImport(args);
+
+        List<String> result = new ParquetReader(S3TestUtils.getTargetDirPath(), s3Client.getConf()).readAllInCsvSorted();
+        assertEquals(S3TestUtils.getExpectedParquetOutput(), result);
+    }
+
+    @Test
+    public void testS3IncrementalAppendAsParquetFile() throws Exception {
+        String[] args = getArgsWithAsParquetFileOption(false);
+        runImport(args);
+
+        S3TestUtils.insertInputDataIntoTable(this, S3TestUtils.getExtraInputData());
+
+        args = getIncrementalAppendArgsWithAsParquetFileOption(false);
+        runImport(args);
+
+        List<String> result = new ParquetReader(S3TestUtils.getTargetDirPath(), s3Client.getConf()).readAllInCsvSorted();
+        assertEquals(S3TestUtils.getExpectedParquetOutputAfterAppend(), result);
+    }
+
+    @Test
+    public void testS3IncrementalAppendAsParquetFileWithMapreduceOutputBasenameProperty() throws Exception {
+        String[] args = getArgsWithAsParquetFileOption(true);
+        runImport(args);
+
+        S3TestUtils.insertInputDataIntoTable(this, S3TestUtils.getExtraInputData());
+
+        args = getIncrementalAppendArgsWithAsParquetFileOption(true);
+        runImport(args);
+
+        S3TestUtils.failIfOutputFilePathContainingPatternDoesNotExists(s3Client, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
+
+        List<String> result = new ParquetReader(S3TestUtils.getTargetDirPath(), s3Client.getConf()).readAllInCsvSorted();
+        assertEquals(S3TestUtils.getExpectedParquetOutputAfterAppend(), result);
+    }
+
+    private String[] getArgsWithAsParquetFileOption(boolean withMapreduceOutputBasenameProperty) {
+        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
+                s3CredentialGenerator,"as-parquetfile");
+        if (withMapreduceOutputBasenameProperty) {
+            builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
+        }
+        return builder.build();
+    }
+
+    private String[] getIncrementalAppendArgsWithAsParquetFileOption(boolean withMapreduceOutputBasenameProperty) {
+        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
+                s3CredentialGenerator,"as-parquetfile");
+        builder = S3TestUtils.addIncrementalAppendImportArgs(builder);
+        if (withMapreduceOutputBasenameProperty) {
+            builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
+        }
+        return builder.build();
+    }
+}
diff --git a/src/test/org/apache/sqoop/s3/TestS3IncrementalAppendSequenceFileImport.java b/src/test/org/apache/sqoop/s3/TestS3IncrementalAppendSequenceFileImport.java
new file mode 100644 (file)
index 0000000..d271588
--- /dev/null
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.s3;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
+import org.apache.sqoop.testutil.DefaultS3CredentialGenerator;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.apache.sqoop.testutil.S3CredentialGenerator;
+import org.apache.sqoop.testutil.S3TestUtils;
+import org.apache.sqoop.testutil.SequenceFileTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+
+import static org.apache.sqoop.util.AppendUtils.MAPREDUCE_OUTPUT_BASENAME_PROPERTY;
+
+public class TestS3IncrementalAppendSequenceFileImport extends ImportJobTestCase {
+
+    public static final Log LOG = LogFactory.getLog(
+            TestS3IncrementalAppendSequenceFileImport.class.getName());
+
+    private static S3CredentialGenerator s3CredentialGenerator;
+
+    private FileSystem s3Client;
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @BeforeClass
+    public static void setupS3Credentials() throws IOException {
+        String generatorCommand = S3TestUtils.getGeneratorCommand();
+        if (generatorCommand != null) {
+            s3CredentialGenerator = new DefaultS3CredentialGenerator(generatorCommand);
+        }
+    }
+
+    @Before
+    public void setup() throws IOException {
+        S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
+        super.setUp();
+        S3TestUtils.createTestTableFromInputData(this);
+        s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
+    }
+
+    @After
+    public void cleanUpOutputDirectories() {
+        S3TestUtils.tearDownS3IncrementalImportTestCase(s3Client);
+        super.tearDown();
+    }
+
+    @Test
+    public void testS3IncrementalAppendAsSequenceFileWhenNoNewRowIsImported() throws Exception {
+        String[] args = getArgsWithAsSequenceFileOption(false);
+        runImport(args);
+
+        args = getIncrementalAppendArgsWithAsSequenceFileOption(false);
+        runImport(args);
+
+        S3TestUtils.failIfOutputFilePathContainingPatternExists(s3Client, MAP_OUTPUT_FILE_00001);
+    }
+
+    @Test
+    public void testS3IncrementalAppendAsSequenceFile() throws Exception {
+        String[] args = getArgsWithAsSequenceFileOption(false);
+        runImport(args);
+
+        S3TestUtils.insertInputDataIntoTable(this, S3TestUtils.getExtraInputData());
+
+        args = getIncrementalAppendArgsWithAsSequenceFileOption(false);
+        runImport(args);
+
+        SequenceFileTestUtils.verify(this, S3TestUtils.getExpectedExtraSequenceFileOutput(), s3Client, S3TestUtils.getTargetDirPath(), MAP_OUTPUT_FILE_00001);
+    }
+
+    @Test
+    public void testS3IncrementalAppendAsSequenceFileWithMapreduceOutputBasenameProperty() throws Exception {
+        String[] args = getArgsWithAsSequenceFileOption(true);
+        runImport(args);
+
+        S3TestUtils.insertInputDataIntoTable(this, S3TestUtils.getExtraInputData());
+
+        args = getIncrementalAppendArgsWithAsSequenceFileOption(true);
+        runImport(args);
+
+        SequenceFileTestUtils.verify(this, S3TestUtils.getExpectedExtraSequenceFileOutput(), s3Client, S3TestUtils.getTargetDirPath(), S3TestUtils.CUSTOM_MAP_OUTPUT_FILE_00001);
+    }
+
+    private String[] getArgsWithAsSequenceFileOption(boolean withMapreduceOutputBasenameProperty) {
+        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
+                s3CredentialGenerator,"as-sequencefile");
+        if (withMapreduceOutputBasenameProperty) {
+            builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
+        }
+        return builder.build();
+    }
+
+    private String[] getIncrementalAppendArgsWithAsSequenceFileOption(boolean withMapreduceOutputBasenameProperty) {
+        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
+                s3CredentialGenerator,"as-sequencefile");
+        builder = S3TestUtils.addIncrementalAppendImportArgs(builder);
+        if (withMapreduceOutputBasenameProperty) {
+            builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
+        }
+        return builder.build();
+    }
+}
diff --git a/src/test/org/apache/sqoop/s3/TestS3IncrementalAppendTextImport.java b/src/test/org/apache/sqoop/s3/TestS3IncrementalAppendTextImport.java
new file mode 100644 (file)
index 0000000..52d89c7
--- /dev/null
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.s3;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
+import org.apache.sqoop.testutil.DefaultS3CredentialGenerator;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.apache.sqoop.testutil.S3CredentialGenerator;
+import org.apache.sqoop.testutil.S3TestUtils;
+import org.apache.sqoop.testutil.TextFileTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+
+import static org.apache.sqoop.util.AppendUtils.MAPREDUCE_OUTPUT_BASENAME_PROPERTY;
+
+public class TestS3IncrementalAppendTextImport extends ImportJobTestCase {
+
+    public static final Log LOG = LogFactory.getLog(
+            TestS3IncrementalAppendTextImport.class.getName());
+
+    private static S3CredentialGenerator s3CredentialGenerator;
+
+    private FileSystem s3Client;
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @BeforeClass
+    public static void setupS3Credentials() throws IOException {
+        String generatorCommand = S3TestUtils.getGeneratorCommand();
+        if (generatorCommand != null) {
+            s3CredentialGenerator = new DefaultS3CredentialGenerator(generatorCommand);
+        }
+    }
+
+    @Before
+    public void setup() throws IOException {
+        S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
+        super.setUp();
+        S3TestUtils.createTestTableFromInputData(this);
+        s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
+    }
+
+    @After
+    public void cleanUpOutputDirectories() {
+        S3TestUtils.tearDownS3IncrementalImportTestCase(s3Client);
+        super.tearDown();
+    }
+
+    @Test
+    public void testS3IncrementalAppendAsTextFileWhenNoNewRowIsImported() throws IOException {
+        String[] args = getArgs(false);
+        runImport(args);
+
+        args = getIncrementalAppendArgs(false);
+        runImport(args);
+
+        S3TestUtils.failIfOutputFilePathContainingPatternExists(s3Client, MAP_OUTPUT_FILE_00001);
+    }
+
+    @Test
+    public void testS3IncrementalAppendAsTextFile() throws IOException {
+        String[] args = getArgs(false);
+        runImport(args);
+
+        S3TestUtils.insertInputDataIntoTable(this, S3TestUtils.getExtraInputData());
+
+        args = getIncrementalAppendArgs(false);
+        runImport(args);
+
+        TextFileTestUtils.verify(S3TestUtils.getExpectedExtraTextOutput(), s3Client, S3TestUtils.getTargetDirPath(), MAP_OUTPUT_FILE_00001);
+    }
+
+    @Test
+    public void testS3IncrementalAppendAsTextFileWithMapreduceOutputBasenameProperty() throws IOException {
+        String[] args = getArgs(true);
+        runImport(args);
+
+        S3TestUtils.insertInputDataIntoTable(this, S3TestUtils.getExtraInputData());
+
+        args = getIncrementalAppendArgs(true);
+        runImport(args);
+
+        TextFileTestUtils.verify(S3TestUtils.getExpectedExtraTextOutput(), s3Client, S3TestUtils.getTargetDirPath(), S3TestUtils.CUSTOM_MAP_OUTPUT_FILE_00001);
+    }
+
+    private String[] getArgs(boolean withMapreduceOutputBasenameProperty) {
+        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTests(this, s3CredentialGenerator);
+        if (withMapreduceOutputBasenameProperty) {
+            builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
+        }
+        return builder.build();
+    }
+
+    private String[] getIncrementalAppendArgs(boolean withMapreduceOutputBasenameProperty) {
+        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTests(this, s3CredentialGenerator);
+        builder = S3TestUtils.addIncrementalAppendImportArgs(builder);
+        if (withMapreduceOutputBasenameProperty) {
+            builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
+        }
+        return builder.build();
+    }
+}
diff --git a/src/test/org/apache/sqoop/s3/TestS3IncrementalMergeParquetImport.java b/src/test/org/apache/sqoop/s3/TestS3IncrementalMergeParquetImport.java
new file mode 100644 (file)
index 0000000..39238c5
--- /dev/null
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.s3;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
+import org.apache.sqoop.testutil.DefaultS3CredentialGenerator;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.apache.sqoop.testutil.S3CredentialGenerator;
+import org.apache.sqoop.testutil.S3TestUtils;
+import org.apache.sqoop.util.ParquetReader;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.sqoop.util.AppendUtils.MAPREDUCE_OUTPUT_BASENAME_PROPERTY;
+import static org.junit.Assert.assertEquals;
+
+public class TestS3IncrementalMergeParquetImport extends ImportJobTestCase {
+
+    public static final Log LOG = LogFactory.getLog(
+            TestS3IncrementalMergeParquetImport.class.getName());
+
+    private static S3CredentialGenerator s3CredentialGenerator;
+
+    private FileSystem s3Client;
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @BeforeClass
+    public static void setupS3Credentials() throws IOException {
+        String generatorCommand = S3TestUtils.getGeneratorCommand();
+        if (generatorCommand != null) {
+            s3CredentialGenerator = new DefaultS3CredentialGenerator(generatorCommand);
+        }
+    }
+
+    @Before
+    public void setup() throws IOException {
+        S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
+        super.setUp();
+        S3TestUtils.createTestTableFromInitialInputDataForMerge(this);
+        s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
+    }
+
+    @After
+    public void cleanUpOutputDirectories() {
+        S3TestUtils.tearDownS3IncrementalImportTestCase(s3Client);
+        super.tearDown();
+    }
+
+    @Test
+    public void testS3IncrementalMergeAsParquetFileWhenNoNewRowIsImported() throws Exception {
+        String[] args = getArgsWithAsParquetFileOption(false);
+        runImport(args);
+
+        clearTable(getTableName());
+
+        args = getIncrementalMergeArgsWithAsParquetFileOption(false);
+        runImport(args);
+
+        List<String> result = new ParquetReader(S3TestUtils.getTargetDirPath(), s3Client.getConf()).readAllInCsvSorted();
+        assertEquals(S3TestUtils.getExpectedParquetOutputWithTimestampColumn(this), result);
+    }
+
+    @Test
+    public void testS3IncrementalMergeAsParquetFile() throws Exception {
+        String[] args = getArgsWithAsParquetFileOption(false);
+        runImport(args);
+
+        clearTable(getTableName());
+
+        S3TestUtils.insertInputDataIntoTableForMerge(this, S3TestUtils.getNewInputDataForMerge());
+
+        args = getIncrementalMergeArgsWithAsParquetFileOption(false);
+        runImport(args);
+
+        List<String> result = new ParquetReader(S3TestUtils.getTargetDirPath(), s3Client.getConf()).readAllInCsvSorted();
+        assertEquals(S3TestUtils.getExpectedParquetOutputWithTimestampColumnAfterMerge(this), result);
+    }
+
+    @Test
+    public void testS3IncrementalMergeAsParquetFileWithMapreduceOutputBasenameProperty() throws Exception {
+        String[] args = getArgsWithAsParquetFileOption(true);
+        runImport(args);
+
+        clearTable(getTableName());
+
+        S3TestUtils.insertInputDataIntoTableForMerge(this, S3TestUtils.getNewInputDataForMerge());
+
+        args = getIncrementalMergeArgsWithAsParquetFileOption(true);
+        runImport(args);
+
+        S3TestUtils.failIfOutputFilePathContainingPatternDoesNotExists(s3Client, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
+
+        List<String> result = new ParquetReader(S3TestUtils.getTargetDirPath(), s3Client.getConf()).readAllInCsvSorted();
+        assertEquals(S3TestUtils.getExpectedParquetOutputWithTimestampColumnAfterMerge(this), result);
+    }
+
+    private String[] getArgsWithAsParquetFileOption(boolean withMapreduceOutputBasenameProperty) {
+        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
+                s3CredentialGenerator,"as-parquetfile");
+        if (withMapreduceOutputBasenameProperty) {
+            builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
+        }
+        return builder.build();
+    }
+
+    private String[] getIncrementalMergeArgsWithAsParquetFileOption(boolean withMapreduceOutputBasenameProperty) {
+        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
+                s3CredentialGenerator,"as-parquetfile");
+        builder = S3TestUtils.addIncrementalMergeImportArgs(builder);
+        if (withMapreduceOutputBasenameProperty) {
+            builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
+        }
+        return builder.build();
+    }
+}
diff --git a/src/test/org/apache/sqoop/s3/TestS3IncrementalMergeTextImport.java b/src/test/org/apache/sqoop/s3/TestS3IncrementalMergeTextImport.java
new file mode 100644 (file)
index 0000000..597e3de
--- /dev/null
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.s3;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
+import org.apache.sqoop.testutil.DefaultS3CredentialGenerator;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.apache.sqoop.testutil.S3CredentialGenerator;
+import org.apache.sqoop.testutil.S3TestUtils;
+import org.apache.sqoop.testutil.TextFileTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+
+import static org.apache.sqoop.util.AppendUtils.MAPREDUCE_OUTPUT_BASENAME_PROPERTY;
+
+public class TestS3IncrementalMergeTextImport extends ImportJobTestCase {
+
+    public static final Log LOG = LogFactory.getLog(
+            TestS3IncrementalMergeTextImport.class.getName());
+
+    private static S3CredentialGenerator s3CredentialGenerator;
+
+    private FileSystem s3Client;
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @BeforeClass
+    public static void setupS3Credentials() throws IOException {
+        String generatorCommand = S3TestUtils.getGeneratorCommand();
+        if (generatorCommand != null) {
+            s3CredentialGenerator = new DefaultS3CredentialGenerator(generatorCommand);
+        }
+    }
+
+    @Before
+    public void setup() throws IOException {
+        S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
+        super.setUp();
+        S3TestUtils.createTestTableFromInitialInputDataForMerge(this);
+        s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
+    }
+
+    @After
+    public void cleanUpOutputDirectories() {
+        S3TestUtils.tearDownS3IncrementalImportTestCase(s3Client);
+        super.tearDown();
+    }
+
+    @Test
+    public void testS3IncrementalMergeAsTextFileWhenNoNewRowIsImported() throws Exception {
+        String[] args = getArgs(false);
+        runImport(args);
+
+        clearTable(getTableName());
+
+        args = getIncrementalMergeArgs(false);
+        runImport(args);
+
+        TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutputBeforeMerge(), s3Client, S3TestUtils.getTargetDirPath(), REDUCE_OUTPUT_FILE_00000);
+    }
+
+    @Test
+    public void testS3IncrementalMergeAsTextFile() throws Exception {
+        String[] args = getArgs(false);
+        runImport(args);
+
+        clearTable(getTableName());
+
+        S3TestUtils.insertInputDataIntoTableForMerge(this, S3TestUtils.getNewInputDataForMerge());
+
+        args = getIncrementalMergeArgs(false);
+        runImport(args);
+
+        TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutputAfterMerge(), s3Client, S3TestUtils.getTargetDirPath(), REDUCE_OUTPUT_FILE_00000);
+    }
+
+    @Test
+    public void testS3IncrementalMergeAsTextFileWithMapreduceOutputBasenameProperty() throws Exception {
+        String[] args = getArgs(true);
+        runImport(args);
+
+        clearTable(getTableName());
+
+        S3TestUtils.insertInputDataIntoTableForMerge(this, S3TestUtils.getNewInputDataForMerge());
+
+        args = getIncrementalMergeArgs(true);
+        runImport(args);
+
+        TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutputAfterMerge(), s3Client, S3TestUtils.getTargetDirPath(), S3TestUtils.CUSTOM_REDUCE_OUTPUT_FILE_00000);
+    }
+
+    private String[] getArgs(boolean withMapreduceOutputBasenameProperty) {
+        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTests(this, s3CredentialGenerator);
+        if (withMapreduceOutputBasenameProperty) {
+            builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
+        }
+        return builder.build();
+    }
+
+    private String[] getIncrementalMergeArgs(boolean withMapreduceOutputBasenameProperty) {
+        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTests(this, s3CredentialGenerator);
+        builder = S3TestUtils.addIncrementalMergeImportArgs(builder);
+        if (withMapreduceOutputBasenameProperty) {
+            builder.withProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY, S3TestUtils.MAPREDUCE_OUTPUT_BASENAME);
+        }
+        return builder.build();
+    }
+}
diff --git a/src/test/org/apache/sqoop/s3/TestS3ParquetImport.java b/src/test/org/apache/sqoop/s3/TestS3ParquetImport.java
new file mode 100644 (file)
index 0000000..c9785d8
--- /dev/null
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.s3;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
+import org.apache.sqoop.testutil.DefaultS3CredentialGenerator;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.apache.sqoop.testutil.S3CredentialGenerator;
+import org.apache.sqoop.testutil.S3TestUtils;
+import org.apache.sqoop.util.ParquetReader;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestS3ParquetImport extends ImportJobTestCase {
+
+    public static final Log LOG = LogFactory.getLog(
+            TestS3ParquetImport.class.getName());
+
+    private static S3CredentialGenerator s3CredentialGenerator;
+
+    private FileSystem s3Client;
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @BeforeClass
+    public static void setupS3Credentials() throws IOException {
+        String generatorCommand = S3TestUtils.getGeneratorCommand();
+        if (generatorCommand != null) {
+            s3CredentialGenerator = new DefaultS3CredentialGenerator(generatorCommand);
+        }
+    }
+
+    @Before
+    public void setup() throws IOException {
+        S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
+        super.setUp();
+        S3TestUtils.createTestTableFromInputData(this);
+        s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
+    }
+
+    @After
+    public void cleanUpTargetDir() {
+        S3TestUtils.tearDownS3ImportTestCase(s3Client);
+        super.tearDown();
+    }
+
+    @Test
+    public void testS3ImportAsParquetFileWithoutDeleteTargetDirOptionWhenTargetDirDoesNotExist() throws Exception {
+        String[] args = getArgsWithAsParquetFileOption();
+        runImport(args);
+
+        List<String> result = new ParquetReader(S3TestUtils.getTargetDirPath(), s3Client.getConf()).readAllInCsvSorted();
+        assertEquals(S3TestUtils.getExpectedParquetOutput(), result);
+    }
+
+    @Test
+    public void testS3ImportAsParquetFileWithDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws Exception {
+        String[] args = getArgsWithAsParquetFileOption();
+        runImport(args);
+
+        args = getArgsWithAsParquetFileAndDeleteTargetDirOption();
+        runImport(args);
+
+        List<String> result = new ParquetReader(S3TestUtils.getTargetDirPath(), s3Client.getConf()).readAllInCsvSorted();
+        assertEquals(S3TestUtils.getExpectedParquetOutput(), result);
+    }
+
+    @Test
+    public void testS3ImportAsParquetFileWithoutDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws Exception {
+        String[] args = getArgsWithAsParquetFileOption();
+        runImport(args);
+
+        thrown.expect(IOException.class);
+        runImport(args);
+    }
+
+    private String[] getArgsWithAsParquetFileOption() {
+        return S3TestUtils.getArgsForS3UnitTestsWithFileFormatOption(this, s3CredentialGenerator, "as-parquetfile");
+    }
+
+    private String[] getArgsWithAsParquetFileAndDeleteTargetDirOption() {
+        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
+                s3CredentialGenerator,"as-parquetfile");
+        builder.withOption("delete-target-dir");
+        return builder.build();
+    }
+}
index c17c1c5..bba8b74 100644 (file)
@@ -60,52 +60,50 @@ public class TestS3SequenceFileImport extends ImportJobTestCase {
     public void setup() throws IOException {
         S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
         super.setUp();
-        s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator, this);
+        S3TestUtils.createTestTableFromInputData(this);
+        s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
     }
 
     @After
-    public void clearOutputDir() throws IOException {
-        S3TestUtils.clearTargetDir(s3Client);
-        S3TestUtils.resetTargetDirName();
+    public void cleanUpTargetDir() {
+        S3TestUtils.tearDownS3ImportTestCase(s3Client);
         super.tearDown();
     }
 
-    protected ArgumentArrayBuilder getArgumentArrayBuilder() {
-        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForUnitTests(this, s3CredentialGenerator);
-        return builder;
-    }
-
-
     @Test
-    public void testImportAsSequenceFileWithoutDeleteTargetDirOptionWhenTargetDirDoesNotExist() throws Exception {
-        ArgumentArrayBuilder builder = getArgumentArrayBuilder();
-        builder.withOption("as-sequencefile");
-        String[] args = builder.build();
+    public void testS3ImportAsSequenceFileWithoutDeleteTargetDirOptionWhenTargetDirDoesNotExist() throws Exception {
+        String[] args = getArgsWithAsSequenceFileOption();
         runImport(args);
         SequenceFileTestUtils.verify(this, S3TestUtils.getExpectedSequenceFileOutput(), s3Client, S3TestUtils.getTargetDirPath());
     }
 
     @Test
-    public void testImportAsSequenceFileWithDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws Exception {
-        ArgumentArrayBuilder builder = getArgumentArrayBuilder();
-        builder.withOption("as-sequencefile");
-        builder.withOption("delete-target-dir");
-        String[] args = builder.build();
+    public void testS3ImportAsSequenceFileWithDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws Exception {
+        String[] args = getArgsWithAsSequenceFileOption();
         runImport(args);
-        SequenceFileTestUtils.verify(this, S3TestUtils.getExpectedSequenceFileOutput(), s3Client, S3TestUtils.getTargetDirPath());
 
+        args = getArgsWithAsSequenceFileAndDeleteTargetDirOption();
         runImport(args);
+        SequenceFileTestUtils.verify(this, S3TestUtils.getExpectedSequenceFileOutput(), s3Client, S3TestUtils.getTargetDirPath());
     }
 
     @Test
-    public void testImportAsSequenceWithoutDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws Exception {
-        ArgumentArrayBuilder builder = getArgumentArrayBuilder();
-        builder.withOption("as-sequencefile");
-        String[] args = builder.build();
+    public void testS3ImportAsSequenceWithoutDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws Exception {
+        String[] args = getArgsWithAsSequenceFileOption();
         runImport(args);
-        SequenceFileTestUtils.verify(this, S3TestUtils.getExpectedSequenceFileOutput(), s3Client, S3TestUtils.getTargetDirPath());
 
         thrown.expect(IOException.class);
         runImport(args);
     }
+
+    private String[] getArgsWithAsSequenceFileOption() {
+        return S3TestUtils.getArgsForS3UnitTestsWithFileFormatOption(this, s3CredentialGenerator, "as-sequencefile");
+    }
+
+    private String[] getArgsWithAsSequenceFileAndDeleteTargetDirOption() {
+        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(this,
+                s3CredentialGenerator,"as-sequencefile");
+        builder.withOption("delete-target-dir");
+        return builder.build();
+    }
 }
index 60e2cd3..114f97c 100644 (file)
@@ -60,81 +60,82 @@ public class TestS3TextImport extends ImportJobTestCase {
     public void setup() throws IOException {
         S3TestUtils.runTestCaseOnlyIfS3CredentialsAreSet(s3CredentialGenerator);
         super.setUp();
-        s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator, this);
+        S3TestUtils.createTestTableFromInputData(this);
+        s3Client = S3TestUtils.setupS3ImportTestCase(s3CredentialGenerator);
     }
 
     @After
-    public void clearOutputDir() throws IOException {
-        S3TestUtils.clearTargetDir(s3Client);
-        S3TestUtils.resetTargetDirName();
+    public void cleanUpTargetDir() {
+        S3TestUtils.tearDownS3ImportTestCase(s3Client);
         super.tearDown();
     }
 
-    protected ArgumentArrayBuilder getArgumentArrayBuilder() {
-        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForUnitTests(this, s3CredentialGenerator);
-        return builder;
-    }
-
     @Test
     public void testImportWithoutDeleteTargetDirOptionWhenTargetDirDoesNotExist() throws IOException {
-        ArgumentArrayBuilder builder = getArgumentArrayBuilder();
-        String[] args = builder.build();
+        String[] args = getArgs(false);
         runImport(args);
         TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutput(), s3Client, S3TestUtils.getTargetDirPath());
     }
 
     @Test
     public void testImportWithDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
-        ArgumentArrayBuilder builder = getArgumentArrayBuilder();
-        builder.withOption("delete-target-dir");
-        String[] args = builder.build();
+        String[] args = getArgs(false);
         runImport(args);
-        TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutput(), s3Client, S3TestUtils.getTargetDirPath());
 
+        args = getArgsWithDeleteTargetOption(false);
         runImport(args);
+        TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutput(), s3Client, S3TestUtils.getTargetDirPath());
     }
 
     @Test
     public void testImportWithoutDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
-        ArgumentArrayBuilder builder = getArgumentArrayBuilder();
-        String[] args = builder.build();
+        String[] args = getArgs(false);
         runImport(args);
-        TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutput(), s3Client, S3TestUtils.getTargetDirPath());
 
         thrown.expect(IOException.class);
         runImport(args);
     }
 
     @Test
-    public void testImportAsTextFile() throws IOException {
-        ArgumentArrayBuilder builder = getArgumentArrayBuilder();
-        builder.withOption("as-textfile");
-        String[] args = builder.build();
+    public void testS3ImportAsTextFile() throws IOException {
+        String[] args = getArgs(true);
         runImport(args);
         TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutput(), s3Client, S3TestUtils.getTargetDirPath());
     }
 
     @Test
-    public void testImportAsTextFileWithDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
-        ArgumentArrayBuilder builder = getArgumentArrayBuilder();
-        builder.withOption("as-textfile");
-        builder.withOption("delete-target-dir");
-        String[] args = builder.build();
+    public void testS3ImportAsTextFileWithDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
+        String[] args = getArgs(true);
         runImport(args);
-        TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutput(), s3Client, S3TestUtils.getTargetDirPath());
 
+        args = getArgsWithDeleteTargetOption(true);
         runImport(args);
+        TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutput(), s3Client, S3TestUtils.getTargetDirPath());
     }
 
     @Test
-    public void testImportAsTextFileWithoutDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
-        ArgumentArrayBuilder builder = getArgumentArrayBuilder();
-        builder.withOption("as-textfile");
-        String[] args = builder.build();
+    public void testS3ImportAsTextFileWithoutDeleteTargetDirOptionWhenTargetDirAlreadyExists() throws IOException {
+        String[] args = getArgs(true);
         runImport(args);
-        TextFileTestUtils.verify(S3TestUtils.getExpectedTextOutput(), s3Client, S3TestUtils.getTargetDirPath());
 
         thrown.expect(IOException.class);
         runImport(args);
     }
+
+    private String[] getArgs(boolean withAsTextFileOption) {
+        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTests(this, s3CredentialGenerator);
+        if (withAsTextFileOption) {
+            builder.withOption("as-textfile");
+        }
+        return builder.build();
+    }
+
+    private String[] getArgsWithDeleteTargetOption(boolean withAsTextFileOption) {
+        ArgumentArrayBuilder builder = S3TestUtils.getArgumentArrayBuilderForS3UnitTests(this, s3CredentialGenerator);
+        builder.withOption("delete-target-dir");
+        if (withAsTextFileOption) {
+            builder.withOption("as-textfile");
+        }
+        return builder.build();
+    }
 }
index 04a8494..111dadb 100644 (file)
@@ -76,17 +76,28 @@ public class AvroTestUtils {
 
   public static void verify(String[] expectedResults, Configuration conf, Path tablePath) {
     Path outputFile = new Path(tablePath, OUTPUT_FILE_NAME);
+    readAndVerify(expectedResults, conf, outputFile);
+  }
+
+  public static void verify(String[] expectedResults, Configuration conf, Path tablePath, String outputFileName) {
+    Path outputFile = new Path(tablePath, outputFileName + ".avro");
+    readAndVerify(expectedResults, conf, outputFile);
+  }
 
+  private static void readAndVerify(String[] expectedResults, Configuration conf, Path outputFile) {
     try (DataFileReader<GenericRecord> reader = read(outputFile, conf)) {
       GenericRecord record;
       if (!reader.hasNext() && expectedResults != null && expectedResults.length > 0) {
-        fail("empty file was not expected");
+        fail("Empty file was not expected");
       }
       int i = 0;
       while (reader.hasNext()){
         record = reader.next();
         assertEquals(expectedResults[i++], record.toString());
       }
+      if (expectedResults != null && expectedResults.length > i) {
+        fail("More output data was expected");
+      }
     }
     catch (IOException ioe) {
       LOG.error("Issue with verifying the output", ioe);
index ad2f10a..1070fb8 100644 (file)
@@ -81,6 +81,9 @@ public abstract class BaseSqoopTestCase {
 
   private static boolean onPhysicalCluster = false;
 
+  public static final String MAP_OUTPUT_FILE_00001 = "part-m-00001";
+  public static final String REDUCE_OUTPUT_FILE_00000 = "part-r-00000";
+
   /** Base directory for all temporary data. */
   public static final String TEMP_BASE_DIR;
 
@@ -447,6 +450,12 @@ public abstract class BaseSqoopTestCase {
     }
   }
 
+  protected void insertRecordsIntoTableWithColTypesAndNames(String[] columns, String[] colTypes, List<List<Object>> records) {
+    for (List<Object> record : records) {
+      insertIntoTable(columns, colTypes, record);
+    }
+  }
+
   protected void insertIntoTable(String[] columns, String[] colTypes, List<Object> record) {
     insertIntoTable(columns, colTypes, toStringArray(record));
   }
@@ -610,6 +619,13 @@ public abstract class BaseSqoopTestCase {
     }
   }
 
+  protected void createTableWithRecordsWithColTypesAndNames(String [] columns, String [] colTypes, List<List<Object>> records) {
+    createTableWithColTypesAndNames(columns, colTypes, records.get(0));
+    for (int i = 1; i < records.size(); i++) {
+      insertIntoTable(columns, colTypes, records.get(i));
+    }
+  }
+
   /**
    * Create a table with a single column and put a data element in it.
    * @param colType the type of the column to create
index ceaff3b..7724026 100644 (file)
@@ -24,12 +24,17 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.sqoop.util.FileSystemUtil;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 
+import static java.util.Arrays.asList;
+import static org.apache.sqoop.util.AppendUtils.MAPREDUCE_OUTPUT_BASENAME_PROPERTY;
+import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeNotNull;
 
 public class S3TestUtils {
@@ -39,12 +44,28 @@ public class S3TestUtils {
 
     private static final String TEMPORARY_CREDENTIALS_PROVIDER_CLASS = "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider";
 
-    private static final String BUCKET_TEMP_DIR = "/tmp/";
+    private static final String BUCKET_TEMP_TEST_DIR = "/tmp/sqooptest/";
 
-    private static final String TARGET_DIR_NAME_PREFIX = "testdir";
+    private static final String TARGET_DIR_NAME_PREFIX = "/testdir";
+
+    private static final String TEMPORARY_ROOTDIR_SUFFIX = "_temprootdir";
 
     private static String targetDirName = TARGET_DIR_NAME_PREFIX;
 
+    private static final String[] COLUMN_NAMES = {"ID",  "SUPERHERO", "COMICS", "DEBUT"};
+    private static final String[] COLUMN_TYPES = { "INT", "VARCHAR(25)", "VARCHAR(25)", "INT"};
+
+    private static final String[] COLUMN_NAMES_FOR_MERGE = { "DEBUT", "SUPERHERO1", "SUPERHERO2", "RECORD_DATE"};
+    private static final String[] COLUMN_TYPES_FOR_MERGE = { "INT", "VARCHAR(25)", "VARCHAR(25)", "TIMESTAMP"};
+    private static final String INITIAL_TIMESTAMP_FOR_MERGE = "2018-07-23 15:00:00.000";
+    private static final String NEW_TIMESTAMP_FOR_MERGE = "2018-08-16 16:30:09.000";
+    private static final String EXPECTED_INITIAL_TIMESTAMP_FOR_MERGE = "2018-07-23 15:00:00.0";
+    private static final String EXPECTED_NEW_TIMESTAMP_FOR_MERGE = "2018-08-16 16:30:09.0";
+
+    public static final String MAPREDUCE_OUTPUT_BASENAME = "custom";
+    public static final String CUSTOM_MAP_OUTPUT_FILE_00001 = MAPREDUCE_OUTPUT_BASENAME + "-m-00001";
+    public static final String CUSTOM_REDUCE_OUTPUT_FILE_00000 = MAPREDUCE_OUTPUT_BASENAME + "-r-00000";
+
     public static final Log LOG = LogFactory.getLog(
             S3TestUtils.class.getName());
 
@@ -65,7 +86,7 @@ public class S3TestUtils {
         targetDirName = targetDirName + "-" + uuid;
     }
 
-    public static void resetTargetDirName() {
+    private static void resetTargetDirName() {
         targetDirName = TARGET_DIR_NAME_PREFIX;
     }
 
@@ -74,7 +95,12 @@ public class S3TestUtils {
     }
 
     public static Path getTargetDirPath() {
-        String targetPathString = getPropertyBucketUrl() + BUCKET_TEMP_DIR + getTargetDirName();
+        String targetPathString = getBucketTempTestDirPath() + getTargetDirName();
+        return new Path(targetPathString);
+    }
+
+    private static Path getBucketTempTestDirPath() {
+        String targetPathString = getPropertyBucketUrl() + BUCKET_TEMP_TEST_DIR;
         return new Path(targetPathString);
     }
 
@@ -84,17 +110,14 @@ public class S3TestUtils {
         assumeNotNull(s3CredentialGenerator.getS3SecretKey());
     }
 
-    public static FileSystem setupS3ImportTestCase(S3CredentialGenerator s3CredentialGenerator,
-                                             BaseSqoopTestCase testCase) throws IOException {
-        createTestTableFromInputData(testCase);
-
+    public static FileSystem setupS3ImportTestCase(S3CredentialGenerator s3CredentialGenerator) throws IOException {
         Configuration hadoopConf = new Configuration();
         S3TestUtils.setS3CredentialsInHadoopConf(hadoopConf, s3CredentialGenerator);
         FileSystem s3Client = FileSystem.get(hadoopConf);
 
         setUniqueTargetDirName();
 
-        clearTargetDir(s3Client);
+        cleanUpDirectory(s3Client, getTargetDirPath());
 
         return s3Client;
     }
@@ -111,8 +134,8 @@ public class S3TestUtils {
         }
     }
 
-    public static ArgumentArrayBuilder getArgumentArrayBuilderForUnitTests(BaseSqoopTestCase testCase,
-                                                                           S3CredentialGenerator s3CredentialGenerator) {
+    public static ArgumentArrayBuilder getArgumentArrayBuilderForS3UnitTests(BaseSqoopTestCase testCase,
+                                                                             S3CredentialGenerator s3CredentialGenerator) {
         ArgumentArrayBuilder builder = new ArgumentArrayBuilder();
         return builder.withCommonHadoopFlags()
                 .withProperty(Constants.ACCESS_KEY, s3CredentialGenerator.getS3AccessKey())
@@ -125,6 +148,43 @@ public class S3TestUtils {
                 .withOption("target-dir", getTargetDirPath().toString());
     }
 
+    public static ArgumentArrayBuilder getArgumentArrayBuilderForS3UnitTestsWithFileFormatOption(BaseSqoopTestCase testCase,
+                                                                                                 S3CredentialGenerator s3CredentialGenerator,
+                                                                                                 String fileFormat) {
+        ArgumentArrayBuilder builder = getArgumentArrayBuilderForS3UnitTests(testCase, s3CredentialGenerator);
+        builder.withOption(fileFormat);
+        return builder;
+    }
+
+    public static String[] getArgsForS3UnitTestsWithFileFormatOption(BaseSqoopTestCase testCase,
+                                                                     S3CredentialGenerator s3CredentialGenerator,
+                                                                     String fileFormat) {
+        ArgumentArrayBuilder builder = getArgumentArrayBuilderForS3UnitTests(testCase, s3CredentialGenerator);
+        builder.withOption(fileFormat);
+        return builder.build();
+    }
+
+    private static Path getTemporaryRootDirPath() {
+        return new Path(getTargetDirPath().toString() + TEMPORARY_ROOTDIR_SUFFIX);
+    }
+
+    public static ArgumentArrayBuilder addIncrementalAppendImportArgs(ArgumentArrayBuilder builder) {
+        return builder
+                .withOption("incremental", "append")
+                .withOption("check-column", "ID")
+                .withOption("last-value", "4")
+                .withOption("temporary-rootdir", getTemporaryRootDirPath().toString());
+    }
+
+    public static ArgumentArrayBuilder addIncrementalMergeImportArgs(ArgumentArrayBuilder builder) {
+        return builder
+                .withOption("incremental", "lastmodified")
+                .withOption("check-column", "RECORD_DATE")
+                .withOption("merge-key", "DEBUT")
+                .withOption("last-value", INITIAL_TIMESTAMP_FOR_MERGE)
+                .withOption("temporary-rootdir", getTemporaryRootDirPath().toString());
+    }
+
     private static List<String[]> getInputData() {
         List<String[]> data = new ArrayList<>();
         data.add(new String[]{"1", "'Ironman'", "'Marvel'", "1963"});
@@ -134,15 +194,40 @@ public class S3TestUtils {
         return data;
     }
 
-    private static void createTestTableFromInputData(BaseSqoopTestCase testCase) {
-        String[] names = {"ID",  "SUPERHERO", "COMICS", "DEBUT"};
-        String[] types = { "INT", "VARCHAR(25)", "VARCHAR(25)", "INT"};
+    public static String[] getExtraInputData() {
+        return new String[]{"5", "'Black Widow'", "'Marvel'", "1964"};
+    }
+
+    public static List<List<Object>> getInitialInputDataForMerge() {
+        return Arrays.<List<Object>>asList(
+            Arrays.<Object>asList(1940, "Black Widow", "Falcon", INITIAL_TIMESTAMP_FOR_MERGE),
+            Arrays.<Object>asList(1974, "Iron Fist", "The Punisher", INITIAL_TIMESTAMP_FOR_MERGE));
+    }
+
+    public static List<List<Object>> getNewInputDataForMerge() {
+        return Arrays.<List<Object>>asList(
+                Arrays.<Object>asList(1962, "Spiderman", "Thor", NEW_TIMESTAMP_FOR_MERGE),
+                Arrays.<Object>asList(1974, "Wolverine", "The Punisher", NEW_TIMESTAMP_FOR_MERGE));
+    }
+
+    public static void createTestTableFromInputData(BaseSqoopTestCase testCase) {
         List<String[]> inputData = getInputData();
-        testCase.createTableWithColTypesAndNames(names, types, new String[0]);
-        testCase.insertIntoTable(names, types, inputData.get(0));
-        testCase.insertIntoTable(names, types, inputData.get(1));
-        testCase.insertIntoTable(names, types, inputData.get(2));
-        testCase.insertIntoTable(names, types, inputData.get(3));
+        testCase.createTableWithColTypesAndNames(COLUMN_NAMES, COLUMN_TYPES, new String[0]);
+        for (String[] dataRow : inputData) {
+            insertInputDataIntoTable(testCase, dataRow);
+        }
+    }
+
+    public static void insertInputDataIntoTable(BaseSqoopTestCase testCase, String[] inputData) {
+        testCase.insertIntoTable(COLUMN_NAMES, COLUMN_TYPES, inputData);
+    }
+
+    public static void createTestTableFromInitialInputDataForMerge(BaseSqoopTestCase testCase) {
+        testCase.createTableWithRecordsWithColTypesAndNames(COLUMN_NAMES_FOR_MERGE, COLUMN_TYPES_FOR_MERGE, getInitialInputDataForMerge());
+    }
+
+    public static void insertInputDataIntoTableForMerge(BaseSqoopTestCase testCase, List<List<Object>> inputData) {
+        testCase.insertRecordsIntoTableWithColTypesAndNames(COLUMN_NAMES_FOR_MERGE, COLUMN_TYPES_FOR_MERGE, inputData);
     }
 
     public static String[] getExpectedTextOutput() {
@@ -154,6 +239,27 @@ public class S3TestUtils {
         };
     }
 
+    public static String[] getExpectedExtraTextOutput() {
+        return new String[] {
+                "5,Black Widow,Marvel,1964"
+        };
+    }
+
+    public static String[] getExpectedTextOutputBeforeMerge() {
+        return new String[] {
+                "1940,Black Widow,Falcon," + EXPECTED_INITIAL_TIMESTAMP_FOR_MERGE,
+                "1974,Iron Fist,The Punisher," + EXPECTED_INITIAL_TIMESTAMP_FOR_MERGE
+        };
+    }
+
+    public static String[] getExpectedTextOutputAfterMerge() {
+        return new String[] {
+                "1940,Black Widow,Falcon," + EXPECTED_INITIAL_TIMESTAMP_FOR_MERGE,
+                "1962,Spiderman,Thor," + EXPECTED_NEW_TIMESTAMP_FOR_MERGE,
+                "1974,Wolverine,The Punisher," + EXPECTED_NEW_TIMESTAMP_FOR_MERGE
+        };
+    }
+
     public static String[] getExpectedSequenceFileOutput() {
         return new String[] {
                 "1,Ironman,Marvel,1963\n",
@@ -163,6 +269,12 @@ public class S3TestUtils {
         };
     }
 
+    public static String[] getExpectedExtraSequenceFileOutput() {
+        return new String[] {
+                "5,Black Widow,Marvel,1964\n"
+        };
+    }
+
     public static String[] getExpectedAvroOutput() {
         return new String[] {
                 "{\"ID\": 1, \"SUPERHERO\": \"Ironman\", \"COMICS\": \"Marvel\", \"DEBUT\": 1963}",
@@ -172,14 +284,78 @@ public class S3TestUtils {
         };
     }
 
-    public static void clearTargetDir(FileSystem s3Client) throws IOException{
+    public static String[] getExpectedExtraAvroOutput() {
+        return new String[] {
+                "{\"ID\": 5, \"SUPERHERO\": \"Black Widow\", \"COMICS\": \"Marvel\", \"DEBUT\": 1964}"
+        };
+    }
+
+    public static List<String> getExpectedParquetOutput() {
+        return asList(
+                "1,Ironman,Marvel,1963",
+                "2,Wonder Woman,DC,1941",
+                "3,Batman,DC,1939",
+                "4,Hulk,Marvel,1962");
+    }
+
+    public static List<String> getExpectedParquetOutputAfterAppend() {
+        return asList(
+                "1,Ironman,Marvel,1963",
+                "2,Wonder Woman,DC,1941",
+                "3,Batman,DC,1939",
+                "4,Hulk,Marvel,1962",
+                "5,Black Widow,Marvel,1964");
+    }
+
+    public static List<String> getExpectedParquetOutputWithTimestampColumn(BaseSqoopTestCase testCase) {
+        return asList(
+                "1940,Black Widow,Falcon," + testCase.timeFromString(INITIAL_TIMESTAMP_FOR_MERGE),
+                "1974,Iron Fist,The Punisher," + testCase.timeFromString(INITIAL_TIMESTAMP_FOR_MERGE));
+    }
+
+    public static List<String> getExpectedParquetOutputWithTimestampColumnAfterMerge(BaseSqoopTestCase testCase) {
+        return asList(
+                "1940,Black Widow,Falcon," + testCase.timeFromString(INITIAL_TIMESTAMP_FOR_MERGE),
+                "1962,Spiderman,Thor," + testCase.timeFromString(NEW_TIMESTAMP_FOR_MERGE),
+                "1974,Wolverine,The Punisher," + testCase.timeFromString(NEW_TIMESTAMP_FOR_MERGE));
+    }
+
+    public static void failIfOutputFilePathContainingPatternExists(FileSystem s3Client, String pattern) throws IOException {
+        List<Path> outputFilesWithPathContainingPattern = FileSystemUtil.findFilesWithPathContainingPattern(getTargetDirPath(),
+                s3Client.getConf(), pattern);
+        if (outputFilesWithPathContainingPattern.size() != 0) {
+            fail("No output file was expected with pattern" + pattern);
+        }
+    }
+
+    public static void failIfOutputFilePathContainingPatternDoesNotExists(FileSystem s3Client, String pattern) throws IOException {
+        List<Path> outputFilesWithPathContainingPattern = FileSystemUtil.findFilesWithPathContainingPattern(getTargetDirPath(),
+                s3Client.getConf(), pattern);
+        if (outputFilesWithPathContainingPattern.size() == 0) {
+            fail("No output file was found with pattern" + pattern);
+        }
+    }
+
+    public static void cleanUpDirectory(FileSystem s3Client, Path directoryPath) {
 
         try {
-            if (s3Client.exists(getTargetDirPath())) {
-                s3Client.delete(getTargetDirPath(), true);
+            if (s3Client.exists(directoryPath)) {
+                s3Client.delete(directoryPath, true);
             }
         } catch (Exception e) {
-            LOG.error("Issue with cleaning up output directory", e);
+            LOG.error("Issue with cleaning up directory", e);
         }
     }
+
+    public static void tearDownS3ImportTestCase(FileSystem s3Client) {
+        cleanUpDirectory(s3Client, getTargetDirPath());
+        resetTargetDirName();
+    }
+
+    public static void tearDownS3IncrementalImportTestCase(FileSystem s3Client) {
+        cleanUpDirectory(s3Client, getTargetDirPath());
+        cleanUpDirectory(s3Client, getTemporaryRootDirPath());
+        resetTargetDirName();
+        System.clearProperty(MAPREDUCE_OUTPUT_BASENAME_PROPERTY);
+    }
 }
index ad7576d..32f1d6c 100644 (file)
@@ -50,6 +50,23 @@ public class SequenceFileTestUtils {
      */
     public static void verify(BaseSqoopTestCase testCase, String[] expectedResults, FileSystem fileSystem, Path tablePath) throws Exception{
         String outputFilePathString = tablePath.toString() + OUTPUT_FILE_NAME;
+        readAndVerify(testCase, expectedResults, fileSystem, outputFilePathString);
+    }
+
+    /**
+     * Verify results at the given tablePath.
+     * @param testCase current instance of BaseSqoopTestCase
+     * @param expectedResults string array of expected results
+     * @param fileSystem current fileSystem
+     * @param tablePath path of the output table
+     * @param outputFileName MapReduce output filename
+     */
+    public static void verify(BaseSqoopTestCase testCase, String[] expectedResults, FileSystem fileSystem, Path tablePath,  String outputFileName) throws Exception{
+        String outputFilePathString = tablePath.toString() + "/" + outputFileName;
+        readAndVerify(testCase, expectedResults, fileSystem, outputFilePathString);
+    }
+
+    private static void readAndVerify(BaseSqoopTestCase testCase, String[] expectedResults, FileSystem fileSystem, String outputFilePathString) throws Exception {
         Path outputFilePath = new Path(outputFilePathString);
 
         Configuration conf = fileSystem.getConf();
@@ -75,6 +92,10 @@ public class SequenceFileTestUtils {
                 assertEquals(expectedResults[i++], value.toString());
                 hasNextRecord = reader.next(key, value);
             }
+
+            if (expectedResults != null && expectedResults.length > i) {
+                fail("More output data was expected");
+            }
         } catch (IOException ioe) {
             LOG.error("Issue with verifying the output", ioe);
             throw new RuntimeException(ioe);
index df19cb8..118d9e1 100644 (file)
@@ -33,7 +33,7 @@ import static org.junit.Assert.fail;
 
 public class TextFileTestUtils {
 
-    private static final String OUTPUT_FILE_NAME = "/part-m-00000";
+    private static final String DEFAULT_OUTPUT_FILE_NAME = "/part-m-00000";
 
     public static final Log LOG = LogFactory.getLog(
             TextFileTestUtils.class.getName());
@@ -45,7 +45,23 @@ public class TextFileTestUtils {
      * @param tablePath path of the output table
      */
     public static void verify(String[] expectedResults, FileSystem fileSystem, Path tablePath) throws IOException {
-        String outputFilePathString = tablePath.toString() + OUTPUT_FILE_NAME;
+        String outputFilePathString = tablePath.toString() + DEFAULT_OUTPUT_FILE_NAME;
+        readAndVerify(expectedResults, fileSystem, outputFilePathString);
+    }
+
+    /**
+     * Verify results at the given tablePath.
+     * @param expectedResults string array of expected results
+     * @param fileSystem current filesystem
+     * @param tablePath path of the output table
+     * @param outputFileName MapReduce output filename
+     */
+    public static void verify(String[] expectedResults, FileSystem fileSystem, Path tablePath, String outputFileName) {
+        String outputFilePathString = tablePath.toString() + "/" + outputFileName;
+        readAndVerify(expectedResults, fileSystem, outputFilePathString);
+    }
+
+    private static void readAndVerify(String[] expectedResults, FileSystem fileSystem, String outputFilePathString) {
         Path outputFilePath = new Path(outputFilePathString);
 
         try (BufferedReader br = new BufferedReader(new InputStreamReader(fileSystem.open(outputFilePath), Charset.forName("UTF-8")))) {
@@ -61,6 +77,10 @@ public class TextFileTestUtils {
                 line = br.readLine();
             }
 
+            if (expectedResults != null && expectedResults.length > i) {
+                fail("More output data was expected");
+            }
+
         } catch (IOException ioe) {
             LOG.error("Issue with verifying the output", ioe);
             throw new RuntimeException(ioe);