SQOOP-3330: Sqoop --append does not work with -Dmapreduce.output.basename
authorSzabolcs Vasas <vasas@apache.org>
Fri, 13 Jul 2018 07:34:38 +0000 (09:34 +0200)
committerSzabolcs Vasas <vasas@apache.org>
Fri, 13 Jul 2018 07:34:38 +0000 (09:34 +0200)
(Eric Lin via Szabolcs Vasas)

src/java/org/apache/sqoop/util/AppendUtils.java
src/test/org/apache/sqoop/TestAppendUtils.java

index fa85280..20c0d13 100644 (file)
@@ -43,7 +43,7 @@ public class AppendUtils {
   private static final String FILEPART_SEPARATOR = "-";
   private static final String FILEEXT_SEPARATOR = ".";
 
-  private static final Pattern DATA_PART_PATTERN = Pattern.compile("part.*-([0-9]{" + PARTITION_DIGITS + "}+).*");
+  public static final String DATA_PART_PATTERN_PREFIX = "part";
 
   private ImportJobContext context = null;
 
@@ -115,7 +115,7 @@ public class AppendUtils {
       for (FileStatus fileStat : existingFiles) {
         if (!fileStat.isDir()) {
           String filename = fileStat.getPath().getName();
-          Matcher mat = DATA_PART_PATTERN.matcher(filename);
+          Matcher mat = getDataFileNamePattern().matcher(filename);
           if (mat.matches()) {
             int thisPart = Integer.parseInt(mat.group(1));
             if (thisPart >= nextPartition) {
@@ -204,7 +204,7 @@ public class AppendUtils {
 
           LOG.debug("Directory: " + sourceFilename + " renamed to: " + destPath.getName());
         }
-      } else if (DATA_PART_PATTERN.matcher(sourceFilename).matches()) {    // move only matching top-level files
+      } else if (getDataFileNamePattern().matcher(sourceFilename).matches()) {    // move only matching top-level files
         do {
           // clear the builder in case this isn't the first iteration
           destFilename.setLength(0);
@@ -276,4 +276,21 @@ public class AppendUtils {
     return new Path(tempDir);
   }
 
+  /**
+   * Return the Pattern of the file name that will end up in the target directory
+   *
+   * Take into account that user might pass mapreduce.output.basename, which should
+   * override the default filename prefix of "part"
+   *
+   * @return Pattern
+   */
+  private Pattern getDataFileNamePattern() {
+    String prefix = context.getOptions().getConf().get("mapreduce.output.basename");
+
+    if(null == prefix || prefix.length() == 0) {
+      prefix = DATA_PART_PATTERN_PREFIX;
+    }
+
+    return Pattern.compile(prefix + ".*-([0-9]{" + PARTITION_DIGITS + "}+).*");
+  }
 }
index f14fc6a..3d66bec 100644 (file)
@@ -131,10 +131,10 @@ public class TestAppendUtils extends ImportJobTestCase {
   }
 
   /** @return FileStatus for data files only. */
-  private FileStatus[] listFiles(FileSystem fs, Path path) throws IOException {
+  private FileStatus[] listFiles(FileSystem fs, Path path, String prefix) throws IOException {
     FileStatus[] fileStatuses = fs.listStatus(path);
     ArrayList files = new ArrayList();
-    Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*");
+    Pattern patt = Pattern.compile(prefix + ".*-([0-9][0-9][0-9][0-9][0-9]).*");
     for (FileStatus fstat : fileStatuses) {
       String fname = fstat.getPath().getName();
       if (!fstat.isDir()) {
@@ -147,6 +147,10 @@ public class TestAppendUtils extends ImportJobTestCase {
     return (FileStatus[]) files.toArray(new FileStatus[files.size()]);
   }
 
+  private FileStatus[] listFiles(FileSystem fs, Path path) throws IOException {
+    return listFiles(fs, path, AppendUtils.DATA_PART_PATTERN_PREFIX);
+  }
+
   private class StatusPathComparator implements Comparator<FileStatus> {
 
     @Override
@@ -183,13 +187,17 @@ public class TestAppendUtils extends ImportJobTestCase {
     }
   }
 
+  public void runAppendTest(ArrayList args, Path outputPath) throws IOException {
+    runAppendTest(args, outputPath, AppendUtils.DATA_PART_PATTERN_PREFIX);
+  }
+
   /**
    * Test for ouput path file-count increase, current files untouched and new
    * correct partition number.
    *
    * @throws IOException
    */
-  public void runAppendTest(ArrayList args, Path outputPath)
+  public void runAppendTest(ArrayList args, Path outputPath, String prefix)
       throws IOException {
 
     try {
@@ -205,7 +213,7 @@ public class TestAppendUtils extends ImportJobTestCase {
       runUncleanImport(argv);
 
       // get current file count
-      FileStatus[] fileStatuses = listFiles(fs, outputPath);
+      FileStatus[] fileStatuses = listFiles(fs, outputPath, prefix);
       Arrays.sort(fileStatuses, new StatusPathComparator());
       int previousFileCount = fileStatuses.length;
 
@@ -223,7 +231,7 @@ public class TestAppendUtils extends ImportJobTestCase {
       runUncleanImport(argv);
 
       // check directory file increase
-      fileStatuses = listFiles(fs, outputPath);
+      fileStatuses = listFiles(fs, outputPath, prefix);
       Arrays.sort(fileStatuses, new StatusPathComparator());
       int currentFileCount = fileStatuses.length;
       assertTrue("Output directory didn't got increased in file count ",
@@ -311,5 +319,25 @@ public class TestAppendUtils extends ImportJobTestCase {
     utils.append();
   }
 
+  /**
+   * Test that when we pass in -Dmapreduce.output.basename=prefix, file should also got appended
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testAppendWithMapreduceOutputBasename() throws IOException {
+    String prefix = "prefix-test";
+
+    ArrayList<String> args = new ArrayList<>();
+    args.add("-D");
+    args.add("mapreduce.output.basename=" + prefix);
+    args.addAll(getOutputlessArgv(false, true, HsqldbTestServer.getFieldNames(), getConf()));
+    String targetDir = getWarehouseDir() + "/tempTargetDirOutputBaseNameTest";
+    args.add("--target-dir");
+    args.add(targetDir);
+
+    Path output = new Path(targetDir);
+    runAppendTest(args, output, prefix);
+  }
 }