SQOOP-2976: Flag to expand decimal values to fit AVRO schema
authorBoglarka Egyed <bogi@apache.org>
Wed, 14 Feb 2018 14:09:19 +0000 (15:09 +0100)
committerBoglarka Egyed <bogi@apache.org>
Wed, 14 Feb 2018 14:09:19 +0000 (15:09 +0100)
(Ferenc Szabo via Boglarka Egyed)

12 files changed:
src/java/org/apache/sqoop/avro/AvroUtil.java
src/java/org/apache/sqoop/config/ConfigurationConstants.java
src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
src/test/org/apache/sqoop/TestAvroImport.java
src/test/org/apache/sqoop/manager/hsqldb/TestHsqldbAvroPadding.java [new file with mode: 0644]
src/test/org/apache/sqoop/manager/oracle/OracleAvroPaddingImportTest.java [new file with mode: 0644]
src/test/org/apache/sqoop/manager/sqlserver/SQLServerAvroPaddingImportTest.java [new file with mode: 0644]
src/test/org/apache/sqoop/metastore/TestMetastoreConfigurationParameters.java
src/test/org/apache/sqoop/testutil/ArgumentArrayBuilder.java [new file with mode: 0644]
src/test/org/apache/sqoop/testutil/ArgumentUtils.java [deleted file]
src/test/org/apache/sqoop/testutil/AvroTestUtils.java [new file with mode: 0644]
src/test/org/apache/sqoop/testutil/BaseSqoopTestCase.java

index 1aae8df..caed90e 100644 (file)
@@ -51,6 +51,9 @@ import java.util.Map;
  * The service class provides methods for creating and converting Avro objects.
  */
 public final class AvroUtil {
+
+  public static final String DECIMAL = "decimal";
+
   public static boolean isDecimal(Schema.Field field) {
     return isDecimal(field.schema());
   }
@@ -65,20 +68,54 @@ public final class AvroUtil {
 
       return false;
     } else {
-      return "decimal".equals(schema.getProp(LogicalType.LOGICAL_TYPE_PROP));
+      return DECIMAL.equals(schema.getProp(LogicalType.LOGICAL_TYPE_PROP));
+    }
+  }
+
+  private static BigDecimal padBigDecimal(BigDecimal bd, Schema schema) {
+    Schema schemaContainingScale = getDecimalSchema(schema);
+    if(schemaContainingScale != null) {
+      int scale = Integer.valueOf(schemaContainingScale.getObjectProp("scale").toString());
+      if (bd.scale() != scale) {
+        return bd.setScale(scale);
+      }
+    }
+    return bd;
+  }
+
+  private static Schema getDecimalSchema(Schema schema) {
+    if (schema.getType().equals(Schema.Type.UNION)) {
+      for (Schema type : schema.getTypes()) {
+        // search for decimal schema
+        Schema schemaContainingScale = getDecimalSchema(type);
+        if (schemaContainingScale != null) {
+          return schemaContainingScale;
+        }
+      }
+    } else {
+      if(DECIMAL.equals(schema.getProp(LogicalType.LOGICAL_TYPE_PROP))) {
+        return schema;
+      }
     }
+    return null;
   }
 
   /**
    * Convert a Sqoop's Java representation to Avro representation.
    */
-  public static Object toAvro(Object o, Schema.Field field, boolean bigDecimalFormatString) {
-    if (o instanceof BigDecimal && !isDecimal(field)) {
-      if (bigDecimalFormatString) {
-        // Returns a string representation of this without an exponent field.
-        return ((BigDecimal) o).toPlainString();
-      } else {
-        return o.toString();
+  public static Object toAvro(Object o, Schema.Field field, boolean bigDecimalFormatString, boolean bigDecimalPaddingEnabled) {
+
+    if (o instanceof BigDecimal) {
+      if(bigDecimalPaddingEnabled) {
+        o = padBigDecimal((BigDecimal) o, field.schema());
+      }
+      if (!isDecimal(field)) {
+        if (bigDecimalFormatString) {
+          // Returns a string representation of this without an exponent field.
+          return ((BigDecimal) o).toPlainString();
+        } else {
+          return o.toString();
+        }
       }
     } else if (o instanceof Date) {
       return ((Date) o).getTime();
@@ -136,16 +173,21 @@ public final class AvroUtil {
     }
   }
 
+  public static GenericRecord toGenericRecord(Map<String, Object> fieldMap,
+                                              Schema schema, boolean bigDecimalFormatString) {
+    return toGenericRecord(fieldMap, schema, bigDecimalFormatString, false);
+  }
+
   /**
    * Manipulate a GenericRecord instance.
    */
   public static GenericRecord toGenericRecord(Map<String, Object> fieldMap,
-      Schema schema, boolean bigDecimalFormatString) {
+      Schema schema, boolean bigDecimalFormatString, boolean bigDecimalPaddingEnabled) {
     GenericRecord record = new GenericData.Record(schema);
     for (Map.Entry<String, Object> entry : fieldMap.entrySet()) {
       String avroColumn = toAvroColumn(entry.getKey());
       Schema.Field field = schema.getField(avroColumn);
-      Object avroObject = toAvro(entry.getValue(), field, bigDecimalFormatString);
+      Object avroObject = toAvro(entry.getValue(), field, bigDecimalFormatString, bigDecimalPaddingEnabled);
       record.put(avroColumn, avroObject);
     }
     return record;
index 7a19a62..2197025 100644 (file)
@@ -106,6 +106,11 @@ public final class ConfigurationConstants {
   public static final String PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL = "sqoop.avro.logical_types.decimal.enable";
 
   /**
+   * Enable padding for avro logical types (decimal support only).
+   */
+  public static final String PROP_ENABLE_AVRO_DECIMAL_PADDING = "sqoop.avro.decimal_padding.enable";
+
+  /**
    * The Configuration property identifying data publisher class.
    */
   public static final String DATA_PUBLISH_CLASS = "sqoop.job.data.publish.class";
index a5e5bf5..1ce1e88 100644 (file)
@@ -18,6 +18,7 @@
 
 package org.apache.sqoop.mapreduce;
 
+import org.apache.sqoop.config.ConfigurationConstants;
 import org.apache.sqoop.lib.LargeObjectLoader;
 import org.apache.sqoop.lib.SqoopRecord;
 import org.apache.avro.Schema;
@@ -44,6 +45,7 @@ public class AvroImportMapper
   private Schema schema;
   private LargeObjectLoader lobLoader;
   private boolean bigDecimalFormatString;
+  private boolean bigDecimalPadding;
 
   @Override
   protected void setup(Context context)
@@ -54,6 +56,7 @@ public class AvroImportMapper
     bigDecimalFormatString = conf.getBoolean(
         ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
         ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
+    bigDecimalPadding = conf.getBoolean(ConfigurationConstants.PROP_ENABLE_AVRO_DECIMAL_PADDING, false);
   }
 
   @Override
@@ -67,7 +70,7 @@ public class AvroImportMapper
       throw new IOException(sqlE);
     }
 
-    GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(), schema, bigDecimalFormatString);
+    GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(), schema, bigDecimalFormatString, bigDecimalPadding);
     wrapper.datum(outKey);
     context.write(wrapper, NullWritable.get());
   }
index 1172fc5..2666f50 100644 (file)
@@ -31,17 +31,14 @@ import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileConstants;
 import org.apache.avro.file.DataFileReader;
-import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.mapred.FsInput;
 import org.apache.avro.util.Utf8;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
-import org.apache.sqoop.testutil.BaseSqoopTestCase;
+import org.apache.sqoop.testutil.AvroTestUtils;
 import org.apache.sqoop.testutil.CommonArgs;
 import org.apache.sqoop.testutil.HsqldbTestServer;
 import org.apache.sqoop.testutil.ImportJobTestCase;
@@ -365,13 +362,7 @@ public class TestAvroImport extends ImportJobTestCase {
 
   protected DataFileReader<GenericRecord> read(Path filename) throws IOException {
     Configuration conf = new Configuration();
-    if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
-      conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
-    }
-    FsInput fsInput = new FsInput(filename, conf);
-    DatumReader<GenericRecord> datumReader =
-      new GenericDatumReader<GenericRecord>();
-    return new DataFileReader<GenericRecord>(fsInput, datumReader);
+    return AvroTestUtils.read(filename, conf);
   }
 
   protected void checkSchemaFile(final Schema schema) throws IOException {
diff --git a/src/test/org/apache/sqoop/manager/hsqldb/TestHsqldbAvroPadding.java b/src/test/org/apache/sqoop/manager/hsqldb/TestHsqldbAvroPadding.java
new file mode 100644 (file)
index 0000000..7e42bf1
--- /dev/null
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.hsqldb;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
+import org.apache.sqoop.testutil.AvroTestUtils;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.List;
+
+
+public class TestHsqldbAvroPadding extends ImportJobTestCase {
+
+  public static final Log LOG = LogFactory.getLog(
+      TestHsqldbAvroPadding.class.getName());
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Before
+  public void setUp() {
+    super.setUp();
+    createTestTable();
+  }
+
+  protected void createTestTable() {
+    String[] names = {"ID",  "NAME", "SALARY", "DEPT"};
+    String[] types = { "INT", "VARCHAR(24)", "DECIMAL(20,5)", "VARCHAR(32)"};
+    List<String[]> inputData = AvroTestUtils.getInputData();
+    createTableWithColTypesAndNames(names, types, new String[0]);
+    insertIntoTable(names, types, inputData.get(0));
+    insertIntoTable(names, types, inputData.get(1));
+    insertIntoTable(names, types, inputData.get(2));
+  }
+
+  protected ArgumentArrayBuilder getArgumentArrayBuilder() {
+    ArgumentArrayBuilder builder = AvroTestUtils.getBuilderForAvroPaddingTest(this);
+    builder.withOption("connect", getConnectString());
+    return builder;
+  }
+
+  @Test
+  public void testAvroImportWithoutPaddingFails() throws IOException {
+    thrown.expect(IOException.class);
+    thrown.expectMessage("Failure during job; return status 1");
+    String[] args = getArgumentArrayBuilder().build();
+    runImport(args);
+  }
+
+  @Test
+  public void testAvroImportWithPadding() throws IOException {
+    ArgumentArrayBuilder builder = getArgumentArrayBuilder();
+    builder.withProperty("sqoop.avro.decimal_padding.enable", "true");
+    String[] args = builder.build();
+    runImport(args);
+    AvroTestUtils.verify(AvroTestUtils.getExpectedResults(), getConf(), getTablePath());
+  }
+}
diff --git a/src/test/org/apache/sqoop/manager/oracle/OracleAvroPaddingImportTest.java b/src/test/org/apache/sqoop/manager/oracle/OracleAvroPaddingImportTest.java
new file mode 100644 (file)
index 0000000..f217f0b
--- /dev/null
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.sqoop.manager.oracle;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.manager.oracle.util.OracleUtils;
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
+import org.apache.sqoop.testutil.AvroTestUtils;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+  public class OracleAvroPaddingImportTest extends ImportJobTestCase {
+
+  public static final Log LOG = LogFactory.getLog(
+      OracleAvroPaddingImportTest.class.getName());
+
+  private  Configuration conf = new Configuration();
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Override
+  protected Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  protected boolean useHsqldbTestServer() {
+    return false;
+  }
+
+  @Override
+  protected String getConnectString() {
+    return org.apache.sqoop.manager.oracle.util.OracleUtils.CONNECT_STRING;
+  }
+
+  @Override
+  protected SqoopOptions getSqoopOptions(Configuration conf) {
+    SqoopOptions opts = new SqoopOptions(conf);
+    org.apache.sqoop.manager.oracle.util.OracleUtils.setOracleAuth(opts);
+    return opts;
+  }
+
+  @Override
+  protected void dropTableIfExists(String table) throws SQLException {
+    OracleUtils.dropTable(table, getManager());
+  }
+
+  @Before
+  public void setUp() {
+    super.setUp();
+    String [] names = {"ID",  "NAME", "SALARY", "DEPT"};
+    String [] types = { "INT", "VARCHAR(24)", "DECIMAL(20,5)", "VARCHAR(32)"};
+    List<String[]> inputData = AvroTestUtils.getInputData();
+    createTableWithColTypesAndNames(names, types, new String[0]);
+    insertIntoTable(names, types, inputData.get(0));
+    insertIntoTable(names, types, inputData.get(1));
+    insertIntoTable(names, types, inputData.get(2));
+  }
+
+  @After
+  public void tearDown() {
+    try {
+      dropTableIfExists(getTableName());
+    } catch (SQLException e) {
+      LOG.warn("Error trying to drop table on tearDown: " + e);
+    }
+    super.tearDown();
+  }
+
+  protected ArgumentArrayBuilder getArgsBuilder() {
+    ArgumentArrayBuilder builder = AvroTestUtils.getBuilderForAvroPaddingTest(this);
+    builder.withOption("connect", getConnectString());
+    return builder;
+  }
+
+  @Test
+  public void testAvroImportWithoutPaddingFails() throws IOException {
+    thrown.expect(IOException.class);
+    thrown.expectMessage("Failure during job; return status 1");
+    String[] args = getArgsBuilder().build();
+    runImport(args);
+  }
+
+  @Test
+  public void testAvroImportWithPadding() throws IOException {
+    ArgumentArrayBuilder builder = getArgsBuilder();
+    builder.withProperty("sqoop.avro.decimal_padding.enable", "true");
+    runImport(builder.build());
+    AvroTestUtils.verify(AvroTestUtils.getExpectedResults(), getConf(), getTablePath());
+  }
+}
diff --git a/src/test/org/apache/sqoop/manager/sqlserver/SQLServerAvroPaddingImportTest.java b/src/test/org/apache/sqoop/manager/sqlserver/SQLServerAvroPaddingImportTest.java
new file mode 100644 (file)
index 0000000..27dc0cd
--- /dev/null
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.sqlserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.avro.AvroUtil;
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
+import org.apache.sqoop.testutil.AvroTestUtils;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+public class SQLServerAvroPaddingImportTest extends ImportJobTestCase {
+
+  public static final Log LOG = LogFactory.getLog(
+          SQLServerAvroPaddingImportTest.class.getName());
+
+  private  Configuration conf = new Configuration();
+
+  @Override
+  protected String getConnectString() {
+    return MSSQLTestUtils.CONNECT_STRING;
+  }
+
+  @Override
+  protected Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  protected SqoopOptions getSqoopOptions(Configuration conf) {
+    SqoopOptions options = new SqoopOptions();
+    options.setConnectString(MSSQLTestUtils.CONNECT_STRING);
+    options.setUsername(MSSQLTestUtils.DATABASE_USER);
+    options.setPassword(MSSQLTestUtils.DATABASE_PASSWORD);
+    return  options;
+  }
+
+  @Override
+  protected boolean useHsqldbTestServer() {
+    return false;
+  }
+
+  @Override
+  protected String dropTableIfExistsCommand(String table) {
+    return "DROP TABLE IF EXISTS " + manager.escapeTableName(table);
+  }
+
+  @Before
+  public void setUp() {
+    super.setUp();
+    String [] names = {"ID",  "NAME", "SALARY", "DEPT"};
+    String [] types = { "INT", "VARCHAR(24)", "DECIMAL(20,5)", "VARCHAR(32)"};
+    List<String[]> inputData = AvroTestUtils.getInputData();
+    createTableWithColTypesAndNames(names, types, new String[0]);
+    insertIntoTable(names, types, inputData.get(0));
+    insertIntoTable(names, types, inputData.get(1));
+    insertIntoTable(names, types, inputData.get(2));
+  }
+
+  @After
+  public void tearDown() {
+    try {
+      dropTableIfExists(getTableName());
+    } catch (SQLException e) {
+      LOG.warn("Error trying to drop table on tearDown: " + e);
+    }
+    super.tearDown();
+  }
+
+  protected ArgumentArrayBuilder getArgsBuilder() {
+    ArgumentArrayBuilder builder = AvroTestUtils.getBuilderForAvroPaddingTest(this);
+    builder.withOption("connect", MSSQLTestUtils.CONNECT_STRING);
+    builder.withOption("username", MSSQLTestUtils.DATABASE_USER);
+    builder.withOption("password", MSSQLTestUtils.DATABASE_PASSWORD);
+    return builder;
+  }
+
+  /**
+   * Test for avro import with a number value in the table.
+   * SQL Server stores the values padded in the database, therefore this import should always be successful
+   * (Oracle for instance doesn't pad numbers in the database, therefore that one fails without the
+   * sqoop.avro.decimal_padding.enable property)
+   * @throws IOException
+   */
+  @Test
+  public void testAvroImportWithoutPaddingFails() throws IOException {
+    String[] args = getArgsBuilder().build();
+    runImport(args);
+    String [] expectedResults = AvroTestUtils.getExpectedResults();
+    AvroTestUtils.verify(expectedResults, getConf(), getTablePath());
+  }
+
+  /**
+   * This test covers a different code path than {@link #testAvroImportWithoutPaddingFails()},
+   * since the BigDecimal values are checked and padded by Sqoop in
+   * {@link AvroUtil#padBigDecimal(java.math.BigDecimal, org.apache.avro.Schema)}
+   * No actual padding occurs, as the values coming back from SQL Server are already padded with 0s.
+   * @throws IOException
+   */
+  @Test
+  public void testAvroImportWithPadding() throws IOException {
+    ArgumentArrayBuilder builder = getArgsBuilder();
+    builder.withProperty("sqoop.avro.decimal_padding.enable", "true");
+    runImport(builder.build());
+    String [] expectedResults = AvroTestUtils.getExpectedResults();
+    AvroTestUtils.verify(expectedResults, getConf(), getTablePath());
+  }
+
+}
index 391dc33..0f1eb89 100644 (file)
@@ -18,9 +18,9 @@
 
 package org.apache.sqoop.metastore;
 
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
 import org.apache.sqoop.testutil.HsqldbTestServer;
 import org.apache.sqoop.Sqoop;
-import org.apache.sqoop.testutil.Argument;
 import org.apache.sqoop.tool.JobTool;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -33,12 +33,6 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 
-import static java.util.Arrays.asList;
-import static java.util.Collections.singleton;
-import static org.apache.sqoop.testutil.Argument.from;
-import static org.apache.sqoop.testutil.Argument.fromPair;
-import static org.apache.sqoop.testutil.ArgumentUtils.createArgumentArray;
-import static org.apache.sqoop.testutil.ArgumentUtils.createArgumentArrayFromProperties;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -74,8 +68,9 @@ public class TestMetastoreConfigurationParameters {
 
     @Test
     public void testJobToolWithAutoConnectDisabledFails() throws IOException {
-        Argument autoConnectProperty = fromPair("sqoop.metastore.client.enable.autoconnect", "false");
-        String[] arguments = createArgumentArrayFromProperties(singleton(autoConnectProperty));
+        ArgumentArrayBuilder builder = new ArgumentArrayBuilder()
+            .withProperty("sqoop.metastore.client.enable.autoconnect", "false");
+        String[] arguments = builder.build();
         assertEquals(STATUS_FAILURE, Sqoop.runSqoop(sqoop, arguments));
     }
 
@@ -92,15 +87,12 @@ public class TestMetastoreConfigurationParameters {
     }
 
     private int runJobToolWithAutoConnectUrlAndCorrectUsernamePasswordSpecified() {
-        Argument url = fromPair("sqoop.metastore.client.autoconnect.url", HsqldbTestServer.getUrl());
-        Argument user = fromPair("sqoop.metastore.client.autoconnect.username", TEST_USER);
-        Argument password = fromPair("sqoop.metastore.client.autoconnect.password", TEST_PASSWORD);
-        Argument listJob = from("list");
-
-        Iterable<Argument> properties = asList(url, user, password);
-        Iterable<Argument> options = singleton(listJob);
-
-        String[] arguments = createArgumentArray(properties, options);
+        ArgumentArrayBuilder builder = new ArgumentArrayBuilder()
+            .withProperty("sqoop.metastore.client.autoconnect.url", HsqldbTestServer.getUrl())
+            .withProperty("sqoop.metastore.client.autoconnect.username", TEST_USER)
+            .withProperty("sqoop.metastore.client.autoconnect.password", TEST_PASSWORD)
+            .withOption("list");
+        String[] arguments = builder.build();
         return Sqoop.runSqoop(sqoop, arguments);
     }
 
diff --git a/src/test/org/apache/sqoop/testutil/ArgumentArrayBuilder.java b/src/test/org/apache/sqoop/testutil/ArgumentArrayBuilder.java
new file mode 100644 (file)
index 0000000..00ce4fe
--- /dev/null
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.testutil;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
+public class ArgumentArrayBuilder {
+
+  private static final String PROPERTY_PREFIX = "-D";
+
+  private static final String OPTION_PREFIX = "--";
+  public static final String TOOL_ARG_SEPARATOR = "--";
+
+  private List<Argument> properties;
+
+  private List<Argument> options;
+
+  private List<Argument> toolOptions;
+
+  private boolean withCommonHadoopFlags;
+
+  public ArgumentArrayBuilder() {
+    properties = new ArrayList<>();
+    options = new ArrayList<>();
+    toolOptions = new ArrayList<>();
+  }
+
+  public ArgumentArrayBuilder withProperty(String name, String value) {
+    properties.add(new Argument(name, value));
+    return this;
+  }
+
+  public ArgumentArrayBuilder withProperty(String name) {
+    properties.add(new Argument(name));
+    return this;
+  }
+
+  public ArgumentArrayBuilder withOption(String name, String value) {
+    options.add(new Argument(name, value));
+    return this;
+  }
+
+  public ArgumentArrayBuilder withOption(String name) {
+    options.add(new Argument(name));
+    return this;
+  }
+
+  public ArgumentArrayBuilder withToolOption(String name, String value) {
+    toolOptions.add(new Argument(name, value));
+    return this;
+  }
+
+  public ArgumentArrayBuilder withToolOption(String name) {
+    toolOptions.add(new Argument(name));
+    return this;
+  }
+
+  public ArgumentArrayBuilder with(ArgumentArrayBuilder otherBuilder) {
+    properties.addAll(otherBuilder.properties);
+    options.addAll(otherBuilder.options);
+    return this;
+  }
+
+  public ArgumentArrayBuilder withCommonHadoopFlags(boolean b) {
+    withCommonHadoopFlags = b;
+    return this;
+  }
+
+  public ArgumentArrayBuilder withCommonHadoopFlags() {
+    withCommonHadoopFlags = true;
+    return this;
+  }
+
+  /**
+   * Transforms the given options, properties and toolOptions to the command line format Sqoop expects,
+   * by adding dashes (--) and the capital D letter when it's necessary (in front of properties)
+   * @return String array that can be used to run tests
+   */
+  public String[] build() {
+    List<String> result = new ArrayList<>();
+    if (withCommonHadoopFlags) {
+      CommonArgs.addHadoopFlags(result);
+    }
+    if (CollectionUtils.isNotEmpty(properties)) {
+      Collections.addAll(result, createArgumentArrayFromProperties(properties));
+    }
+    if (CollectionUtils.isNotEmpty(options)) {
+      Collections.addAll(result, createArgumentArrayFromOptions(options));
+    }
+    if (CollectionUtils.isNotEmpty(toolOptions)) {
+      result.add(TOOL_ARG_SEPARATOR);
+      Collections.addAll(result, createArgumentArrayFromOptions(toolOptions));
+    }
+    return result.toArray(new String[result.size()]);
+  }
+
+  private String[] createArgumentArrayFromProperties(List<Argument> properties) {
+    List<String> result = new ArrayList<>();
+    for (Argument property : properties) {
+      result.add(PROPERTY_PREFIX);
+      result.add(property.toString());
+    }
+    return result.toArray(new String[result.size()]);
+  }
+
+  private String[] createArgumentArrayFromOptions(List<Argument> options) {
+    List<String> result = new ArrayList<>();
+    for (Argument option : options) {
+      result.add(OPTION_PREFIX + option.getName());
+      if (!isEmpty(option.getValue())) {
+        result.add(option.getValue());
+      }
+    }
+    return result.toArray(new String[result.size()]);
+  }
+}
\ No newline at end of file
diff --git a/src/test/org/apache/sqoop/testutil/ArgumentUtils.java b/src/test/org/apache/sqoop/testutil/ArgumentUtils.java
deleted file mode 100644 (file)
index 2f95e45..0000000
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.testutil;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.commons.lang3.StringUtils.isEmpty;
-
-public final class ArgumentUtils {
-
-  private static final String PROPERTY_PREFIX = "-D";
-
-  private static final String OPTION_PREFIX = "--";
-
-  public static String[] createArgumentArrayFromProperties(Iterable<Argument> properties) {
-    List<String> result = new ArrayList<>();
-    for (Argument property : properties) {
-      result.add(PROPERTY_PREFIX);
-      result.add(property.toString());
-    }
-
-    return result.toArray(new String[result.size()]);
-  }
-
-  public static String[] createArgumentArrayFromOptions(Iterable<Argument> options) {
-    List<String> result = new ArrayList<>();
-    for (Argument option : options) {
-      result.add(OPTION_PREFIX + option.getName());
-      if (!isEmpty(option.getValue())) {
-        result.add(option.getValue());
-      }
-    }
-
-    return result.toArray(new String[result.size()]);
-  }
-
-  public static String[] createArgumentArray(Iterable<Argument> properties, Iterable<Argument> options) {
-    List<String> result = new ArrayList<>();
-    Collections.addAll(result, createArgumentArrayFromProperties(properties));
-    Collections.addAll(result, createArgumentArrayFromOptions(options));
-
-    return result.toArray(new String[result.size()]);
-  }
-
-}
diff --git a/src/test/org/apache/sqoop/testutil/AvroTestUtils.java b/src/test/org/apache/sqoop/testutil/AvroTestUtils.java
new file mode 100644 (file)
index 0000000..75940bf
--- /dev/null
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.testutil;
+
+import org.apache.avro.Conversions;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+
+public class AvroTestUtils {
+
+  public static List<String[]> getInputData() {
+    List<String[]> data = new ArrayList<>();
+    data.add(new String[]{"1", "'Aaron'", "1000000.05", "'engineering'"});
+    data.add(new String[]{"2", "'Bob'", "400.10", "'sales'"});
+    data.add(new String[]{"3", "'Fred'", "15.23", "'marketing'"});
+    return data;
+  }
+
+  public static String[] getExpectedResults() {
+    return new String[] {
+        "{\"ID\": 1, \"NAME\": \"Aaron\", \"SALARY\": 1000000.05000, \"DEPT\": \"engineering\"}",
+        "{\"ID\": 2, \"NAME\": \"Bob\", \"SALARY\": 400.10000, \"DEPT\": \"sales\"}",
+        "{\"ID\": 3, \"NAME\": \"Fred\", \"SALARY\": 15.23000, \"DEPT\": \"marketing\"}"
+    };
+  }
+
+  public static ArgumentArrayBuilder getBuilderForAvroPaddingTest(BaseSqoopTestCase testCase) {
+    ArgumentArrayBuilder builder = new ArgumentArrayBuilder();
+    return builder.withCommonHadoopFlags(true)
+        .withProperty("sqoop.avro.logical_types.decimal.enable", "true")
+        .withOption("as-avrodatafile")
+        .withOption("warehouse-dir", testCase.getWarehouseDir())
+        .withOption("num-mappers", "1")
+        .withOption("table", testCase.getTableName());
+  }
+
+  public static void verify(String[] expectedResults, Configuration conf, Path tablePath) {
+    Path outputFile = new Path(tablePath, "part-m-00000.avro");
+    GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
+    try (DataFileReader<GenericRecord> reader = read(outputFile, conf)) {
+      GenericRecord record;
+      if (!reader.hasNext() && expectedResults != null && expectedResults.length > 0) {
+        fail("empty file was not expected");
+      }
+      int i = 0;
+      while (reader.hasNext()){
+        record = reader.next();
+        assertEquals(expectedResults[i++], record.toString());
+      }
+    }
+    catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  /**
+   * Return an instance of DataFileReader for the given filename.
+   * @param filename path that we're opening a reader for.
+   * @param conf
+   * @return instance of DataFileReader.
+   * @throws IOException
+   */
+  public static DataFileReader<GenericRecord> read(Path filename, Configuration conf) throws IOException {
+    if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
+      conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
+    }
+    FsInput fsInput = new FsInput(filename, conf);
+    DatumReader<GenericRecord> datumReader =  new GenericDatumReader<>();
+    return new DataFileReader<>(fsInput, datumReader);
+  }
+}
index 588f439..a5f85a0 100644 (file)
@@ -167,7 +167,7 @@ public abstract class BaseSqoopTestCase {
 
   // instance variables populated during setUp, used during tests
   private HsqldbTestServer testServer;
-  private ConnManager manager;
+  protected ConnManager manager;
 
   private static boolean isLog4jConfigured = false;
 
@@ -299,7 +299,8 @@ public abstract class BaseSqoopTestCase {
    */
   protected void dropTableIfExists(String table) throws SQLException {
     Connection conn = getManager().getConnection();
-    PreparedStatement statement = conn.prepareStatement(dropTableIfExistsCommand(table),
+    String dropStatement = dropTableIfExistsCommand(table);
+    PreparedStatement statement = conn.prepareStatement(dropStatement,
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
     try {
       statement.executeUpdate();
@@ -351,7 +352,6 @@ public abstract class BaseSqoopTestCase {
             columnDefStr += ", ";
           }
         }
-
         createTableStr = "CREATE TABLE " + manager.escapeTableName(newTableName) + "(" + columnDefStr + ")";
         LOG.info("Creating table: " + createTableStr);
         statement = conn.prepareStatement(
@@ -423,22 +423,28 @@ public abstract class BaseSqoopTestCase {
     }
   }
 
-  /**
-   * insert into a table with a set of columns values for a given row.
-   * @param colTypes the types of the columns to make
-   * @param vals the SQL text for each value to insert
-   */
   protected void insertIntoTable(String[] colTypes, String[] vals) {
-    assert colNames != null;
-    assert colNames.length == vals.length;
+    insertIntoTable(null, colTypes, vals);
+  }
+
+  protected void insertIntoTable(String[] columns, String[] colTypes, String[] vals) {
+    assert colTypes != null;
+    assert colTypes.length == vals.length;
 
     Connection conn = null;
     PreparedStatement statement = null;
 
-    String[] colNames = new String[vals.length];
-    for( int i = 0; i < vals.length; i++) {
-      colNames[i] = BASE_COL_NAME + Integer.toString(i);
+    String[] colNames;
+    if (columns == null){
+      colNames = new String[vals.length];
+      for( int i = 0; i < vals.length; i++) {
+        colNames[i] = BASE_COL_NAME + Integer.toString(i);
+      }
     }
+    else {
+      colNames = columns;
+    }
+
     try {
         conn = getManager().getConnection();
         for (int count=0; vals != null && count < vals.length/colTypes.length;