SQOOP-2949: SQL Syntax error when split-by column is of character type and min or...
authorSzabolcs Vasas <vasas@apache.org>
Wed, 31 Oct 2018 15:48:54 +0000 (16:48 +0100)
committerSzabolcs Vasas <vasas@apache.org>
Wed, 31 Oct 2018 15:48:54 +0000 (16:48 +0100)
(Fero Szabo via Szabolcs Vasas)

src/java/org/apache/sqoop/mapreduce/db/TextSplitter.java
src/test/org/apache/sqoop/importjob/SplitByImportTest.java [new file with mode: 0644]
src/test/org/apache/sqoop/importjob/configuration/GenericImportJobSplitByTestConfiguration.java [new file with mode: 0644]

index 22bbfe6..f8d9228 100644 (file)
@@ -167,24 +167,35 @@ public class TextSplitter extends BigDecimalSplitter {
 
     // Convert the BigDecimal splitPoints into their string representations.
     for (BigDecimal bd : splitPoints) {
-      splitStrings.add(commonPrefix + bigDecimalToString(bd));
+      splitStrings.add(escapeSingleQuotesInSql(commonPrefix + bigDecimalToString(bd)));
     }
 
     // Make sure that our user-specified boundaries are the first and last
     // entries in the array.
     if (splitStrings.size() == 0
         || !splitStrings.get(0).equals(commonPrefix + minString)) {
-      splitStrings.add(0, commonPrefix + minString);
+      splitStrings.add(0, escapeSingleQuotesInSql(commonPrefix + minString));
     }
     if (splitStrings.size() == 1
         || !splitStrings.get(splitStrings.size() - 1).equals(
         commonPrefix + maxString)) {
-      splitStrings.add(commonPrefix + maxString);
+      splitStrings.add(escapeSingleQuotesInSql(commonPrefix + maxString));
     }
 
     return splitStrings;
   }
 
+  /**
+   * Return string after escaping single quotes
+   */
+  private String escapeSingleQuotesInSql (String val) {
+    if (val == null) {
+         return null;
+    }
+
+    return val.replaceAll("'", "''");
+  }
+
   private static final BigDecimal ONE_PLACE = new BigDecimal(65536);
 
   // Maximum number of characters to convert. This is to prevent rounding
diff --git a/src/test/org/apache/sqoop/importjob/SplitByImportTest.java b/src/test/org/apache/sqoop/importjob/SplitByImportTest.java
new file mode 100644 (file)
index 0000000..7977c0b
--- /dev/null
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.importjob;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.importjob.configuration.GenericImportJobSplitByTestConfiguration;
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.apache.sqoop.testutil.adapter.DatabaseAdapter;
+import org.apache.sqoop.testutil.adapter.MSSQLServerDatabaseAdapter;
+import org.apache.sqoop.testutil.adapter.MySqlDatabaseAdapter;
+import org.apache.sqoop.testutil.adapter.OracleDatabaseAdapter;
+import org.apache.sqoop.testutil.adapter.PostgresDatabaseAdapter;
+import org.apache.sqoop.util.ParquetReader;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class SplitByImportTest extends ImportJobTestCase {
+
+  public static final Log LOG = LogFactory.getLog(SplitByImportTest.class.getName());
+
+  private Configuration conf = new Configuration();
+
+  private final ImportJobTestConfiguration configuration;
+  private final DatabaseAdapter adapter;
+
+  @Parameters(name = "Adapter: {0}| Config: {1}")
+  public static Iterable<? extends Object> testConfigurations() {
+    GenericImportJobSplitByTestConfiguration testConfiguration = new GenericImportJobSplitByTestConfiguration();
+    return asList(
+        new Object[] {new OracleDatabaseAdapter(), testConfiguration},
+        new Object[] {new PostgresDatabaseAdapter(), testConfiguration},
+        new Object[] {new MSSQLServerDatabaseAdapter(), testConfiguration},
+        new Object[] {new MySqlDatabaseAdapter(), testConfiguration}
+    );
+  }
+
+  public SplitByImportTest(DatabaseAdapter adapter, ImportJobTestConfiguration configuration) {
+    this.adapter = adapter;
+    this.configuration = configuration;
+  }
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Override
+  protected Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  protected boolean useHsqldbTestServer() {
+    return false;
+  }
+
+  @Override
+  protected String getConnectString() {
+    return adapter.getConnectionString();
+  }
+
+  @Override
+  protected SqoopOptions getSqoopOptions(Configuration conf) {
+    SqoopOptions opts = new SqoopOptions(conf);
+    adapter.injectConnectionParameters(opts);
+    return opts;
+  }
+
+  @Override
+  protected void dropTableIfExists(String table) throws SQLException {
+    adapter.dropTableIfExists(table, getManager());
+  }
+
+  @Before
+  public void setUp() {
+    super.setUp();
+    String[] names = configuration.getNames();
+    String[] types = configuration.getTypes();
+    createTableWithColTypesAndNames(names, types, new String[0]);
+    List<String[]> inputData = configuration.getSampleData();
+    for (String[] input  : inputData) {
+      insertIntoTable(names, types, input);
+    }
+  }
+
+  @After
+  public void tearDown() {
+    try {
+      dropTableIfExists(getTableName());
+    } catch (SQLException e) {
+      LOG.warn("Error trying to drop table on tearDown: " + e);
+    }
+    super.tearDown();
+  }
+
+  private ArgumentArrayBuilder getArgsBuilder() {
+    return new ArgumentArrayBuilder()
+        .withCommonHadoopFlags(true)
+        .withProperty("org.apache.sqoop.splitter.allow_text_splitter","true")
+        .withOption("warehouse-dir", getWarehouseDir())
+        .withOption("num-mappers", "2")
+        .withOption("table", getTableName())
+        .withOption("connect", getConnectString())
+        .withOption("split-by", GenericImportJobSplitByTestConfiguration.NAME_COLUMN)
+        .withOption("as-parquetfile");
+  }
+
+  @Test
+  public void testSplitBy() throws IOException {
+    ArgumentArrayBuilder builder = getArgsBuilder();
+    String[] args = builder.build();
+    runImport(args);
+    verifyParquetFile();
+  }
+
+  private void verifyParquetFile() {
+    ParquetReader reader = new ParquetReader(new Path(getWarehouseDir() + "/" + getTableName()), getConf());
+    assertEquals(asList(configuration.getExpectedResults()), reader.readAllInCsvSorted());
+  }
+}
diff --git a/src/test/org/apache/sqoop/importjob/configuration/GenericImportJobSplitByTestConfiguration.java b/src/test/org/apache/sqoop/importjob/configuration/GenericImportJobSplitByTestConfiguration.java
new file mode 100644 (file)
index 0000000..f137b56
--- /dev/null
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.importjob.configuration;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.sqoop.importjob.ImportJobTestConfiguration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This test configuration intends to cover the fact that oracle stores these types without padding them with 0s,
+ * therefore when importing into avro, one has to use the padding feature.
+ */
+public class GenericImportJobSplitByTestConfiguration implements ImportJobTestConfiguration {
+
+  public static final String NAME_COLUMN = "NAME";
+  public static final char SEPARATOR = ',';
+
+  List<String[]> data = new ArrayList<>();
+  {
+    data.add(new String[]{"ID_1", "Mr T."});
+    data.add(new String[]{"ID_2", "D'Artagnan"});
+    data.add(new String[]{"ID_3", "Jean D'Arc"});
+    data.add(new String[]{"ID_4", "Jeremy Renner"});
+  }
+
+  List<String[]> escapedData = new ArrayList<>();
+  {
+    escapedData.add(new String[]{"'ID_1'", "'Mr T.'"});
+    escapedData.add(new String[]{"'ID_2'", "'D''Artagnan'"});
+    escapedData.add(new String[]{"'ID_3'", "'Jean D''Arc'"});
+    escapedData.add(new String[]{"'ID_4'", "'Jeremy Renner'"});
+  }
+
+  @Override
+  public String[] getTypes() {
+    return new String[]{"VARCHAR(20)", "VARCHAR(20)"};
+  }
+
+  @Override
+  public String[] getNames() {
+    return new String[]{"ID", NAME_COLUMN};
+  }
+
+  @Override
+  public List<String[]> getSampleData() {
+    return new ArrayList<>(escapedData);
+  }
+
+  @Override
+  public String[] getExpectedResults() {
+    return data.stream()
+        .map(element -> StringUtils.join(element, SEPARATOR))
+        .toArray(String[]::new);
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName();
+  }
+}