SQOOP-3333: Change default behavior of the MS SQL connector to non-resilient.
authorSzabolcs Vasas <vasas@apache.org>
Wed, 20 Jun 2018 10:41:39 +0000 (12:41 +0200)
committerSzabolcs Vasas <vasas@apache.org>
Wed, 20 Jun 2018 10:41:39 +0000 (12:41 +0200)
(Fero Szabo via Szabolcs Vasas)

src/docs/user/connectors.txt
src/java/org/apache/sqoop/manager/ExportJobContext.java
src/java/org/apache/sqoop/manager/SQLServerManager.java
src/java/org/apache/sqoop/manager/SqlServerManagerContextConfigurator.java [new file with mode: 0644]
src/test/org/apache/sqoop/manager/sqlserver/SQLServerManagerImportTest.java
src/test/org/apache/sqoop/manager/sqlserver/TestSqlServerManagerContextConfigurator.java [new file with mode: 0644]

index 7c54071..f1c7aeb 100644 (file)
@@ -127,7 +127,7 @@ Argument                                 Description
 ---------------------------------------------------------------------------------
 +\--identity-insert                      Set IDENTITY_INSERT to ON before \
                                          export insert.
-+\--non-resilient+                       Don't attempt to recover failed \
++\--resilient+                           Attempt to recover failed \
                                          export operations.
 +\--schema <name>+                       Scheme name that sqoop should use. \
                                          Default is "dbo".
@@ -144,14 +144,18 @@ You can allow inserts on columns that have identity. For example:
 $ sqoop export ... --export-dir custom_dir --table custom_table -- --identity-insert
 ----
 
-Non-resilient operations
-^^^^^^^^^^^^^^^^^^^^^^^^
+Resilient operations
+^^^^^^^^^^^^^^^^^^^^
 
-You can override the default and not use resilient operations during export.
-This will avoid retrying failed operations. For example:
+You can override the default and use resilient operations during export.
+This will retry failed operations, i.e. if the connection gets dropped by
+SQL Server, the mapper will try to reconnect and continue from where it was before.
+The split-by column has to be specified and it is also required to be unique
+and in ascending order.
+For example:
 
 ----
-$ sqoop export ... --export-dir custom_dir --table custom_table -- --non-resilient
+$ sqoop export ... --export-dir custom_dir --table custom_table -- --resilient
 ----
 
 Schema support
index 773cf74..643f4b1 100644 (file)
@@ -33,6 +33,7 @@ public class ExportJobContext {
   private String jarFile;
   private SqoopOptions options;
   private ConnManager manager;
+  private Class outputFormatClass;
 
   public ExportJobContext(final String table, final String jar,
       final SqoopOptions opts) {
@@ -78,5 +79,12 @@ public class ExportJobContext {
     return this.manager;
   }
 
+  public Class getOutputFormatClass() {
+    return outputFormatClass;
+  }
+
+  public void setOutputFormatClass(Class outputFormatClass) {
+    this.outputFormatClass = outputFormatClass;
+  }
 }
 
index b136087..c98ad2d 100644 (file)
@@ -31,28 +31,22 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sqoop.mapreduce.JdbcUpsertExportJob;
-import org.apache.sqoop.mapreduce.SQLServerResilientExportOutputFormat;
-import org.apache.sqoop.mapreduce.SQLServerResilientUpdateOutputFormat;
-import org.apache.sqoop.mapreduce.db.SQLServerDBInputFormat;
-import org.apache.sqoop.mapreduce.db.SQLServerConnectionFailureHandler;
 
 import org.apache.sqoop.SqoopOptions;
 import org.apache.sqoop.mapreduce.JdbcExportJob;
 import org.apache.sqoop.mapreduce.JdbcUpdateExportJob;
+import org.apache.sqoop.mapreduce.sqlserver.SqlServerInputFormat;
 import org.apache.sqoop.util.ExportException;
 import org.apache.sqoop.util.ImportException;
 
 import org.apache.sqoop.cli.RelatedOptions;
-import org.apache.sqoop.mapreduce.sqlserver.SqlServerExportBatchOutputFormat;
-import org.apache.sqoop.mapreduce.sqlserver.SqlServerInputFormat;
 import org.apache.sqoop.mapreduce.sqlserver.SqlServerUpsertOutputFormat;
 
 /**
  * Manages connections to SQLServer databases. Requires the SQLServer JDBC
  * driver.
  */
-public class SQLServerManager
-    extends InformationSchemaManager {
+public class SQLServerManager extends InformationSchemaManager {
 
   public static final String SCHEMA = "schema";
   public static final String TABLE_HINTS = "table-hints";
@@ -62,8 +56,6 @@ public class SQLServerManager
   public static final Log LOG = LogFactory.getLog(
       SQLServerManager.class.getName());
 
-  // Option set in extra-arguments to disable resiliency and use default mode
-  public static final String NON_RESILIENT_OPTION = "non-resilient";
 
   // Option to allow inserts on identity columns
   public static final String IDENTITY_INSERT = "identity-insert";
@@ -88,12 +80,19 @@ public class SQLServerManager
    */
   private boolean identityInserts;
 
+  final SqlServerManagerContextConfigurator formatConfigurator;
+
   public SQLServerManager(final SqoopOptions opts) {
     this(SQLSERVER.getDriverClass(), opts);
   }
 
   public SQLServerManager(final String driver, final SqoopOptions opts) {
+    this(driver, opts, new SqlServerManagerContextConfigurator());
+  }
+
+  public SQLServerManager(final String driver, final SqoopOptions opts, final SqlServerManagerContextConfigurator configurator) {
     super(driver, opts);
+    this.formatConfigurator = configurator;
 
     // Try to parse extra arguments
     try {
@@ -121,7 +120,7 @@ public class SQLServerManager
        * import/export.
        */
       javaType = "String";
-    }else {
+    } else {
       //If none of the above data types match, it returns parent method's
       //status, which can be null.
       javaType = super.toJavaType(sqlType);
@@ -133,8 +132,7 @@ public class SQLServerManager
    * {@inheritDoc}
    */
   @Override
-  public void importTable(
-      org.apache.sqoop.manager.ImportJobContext context)
+  public void importTable(org.apache.sqoop.manager.ImportJobContext context)
       throws IOException, ImportException {
     // We're the correct connection manager
     context.setConnManager(this);
@@ -144,20 +142,9 @@ public class SQLServerManager
     if (tableHints != null) {
       configuration.set(TABLE_HINTS_PROP, tableHints);
     }
-    if (!isNonResilientOperation()) {
-      // Enable connection recovery only if split column is provided
-      SqoopOptions opts = context.getOptions();
-      String splitCol = getSplitColumn(opts, context.getTableName());
-      if (splitCol != null) {
-        // Configure SQLServer table import jobs for connection recovery
-        configureConnectionRecoveryForImport(context);
-      } else {
-        // Set our own input format
-        context.setInputFormat(SqlServerInputFormat.class);
-      }
-    } else {
-      context.setInputFormat(SqlServerInputFormat.class);
-    }
+    String splitColumn = getSplitColumn(context.getOptions(), context.getTableName());
+    context.setInputFormat(SqlServerInputFormat.class);
+    formatConfigurator.configureContextForImport(context, splitColumn);
     super.importTable(context);
   }
 
@@ -165,8 +152,7 @@ public class SQLServerManager
    * Export data stored in HDFS into a table in a database.
    */
   @Override
-  public void exportTable(org.apache.sqoop.manager.ExportJobContext context)
-      throws IOException, ExportException {
+  public void exportTable(org.apache.sqoop.manager.ExportJobContext context) throws IOException, ExportException {
     context.setConnManager(this);
 
     // Propagate table hints to job
@@ -178,15 +164,9 @@ public class SQLServerManager
     // Propagate whether to allow identity inserts to job
     configuration.setBoolean(IDENTITY_INSERT_PROP, identityInserts);
 
-    JdbcExportJob exportJob;
-    if (isNonResilientOperation()) {
-      exportJob = new JdbcExportJob(context, null, null,
-      SqlServerExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
-    } else {
-      exportJob = new JdbcExportJob(context, null, null,
-        SQLServerResilientExportOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
-      configureConnectionRecoveryForExport(context);
-    }
+    formatConfigurator.configureContextForExport(context);
+    JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
+        context.getOutputFormatClass(), getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 
@@ -194,17 +174,15 @@ public class SQLServerManager
   /**
    * {@inheritDoc}
    */
-  public void updateTable(
-      org.apache.sqoop.manager.ExportJobContext context)
+  public void updateTable(org.apache.sqoop.manager.ExportJobContext context)
       throws IOException, ExportException {
-    if (isNonResilientOperation()) {
-      super.updateTable(context);
-    } else {
-      context.setConnManager(this);
+    boolean runAsExportJob = formatConfigurator.configureContextForUpdate(context, this);
+    if (runAsExportJob) {
       JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context, null,
-        null, SQLServerResilientUpdateOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
-      configureConnectionRecoveryForUpdate(context);
+          null, context.getOutputFormatClass(), getParquetJobConfigurator().createParquetExportJobConfigurator());
       exportJob.runExport();
+    } else {
+      super.updateTable(context);
     }
   }
 
@@ -391,99 +369,10 @@ public class SQLServerManager
    */
   public void importQuery(org.apache.sqoop.manager.ImportJobContext context)
       throws IOException, ImportException {
-    if (!isNonResilientOperation()) {
-      // Enable connection recovery only if split column is provided
-      SqoopOptions opts = context.getOptions();
-      String splitCol = getSplitColumn(opts, context.getTableName());
-      if (splitCol != null) {
-        // Configure SQLServer query import jobs for connection recovery
-        configureConnectionRecoveryForImport(context);
-      }
-    }
+    String splitColumn = getSplitColumn(context.getOptions(), context.getTableName());
+    formatConfigurator.configureContextForImport(context, splitColumn);
     super.importQuery(context);
   }
 
-  /**
-   * Configure SQLServer Sqoop Jobs to recover failed connections by using
-   * SQLServerConnectionFailureHandler by default.
-   */
-  protected void configureConnectionRecoveryForImport(
-      org.apache.sqoop.manager.ImportJobContext context) {
-
-    Configuration conf = context.getOptions().getConf();
-
-    // Configure input format class
-    context.setInputFormat(SQLServerDBInputFormat.class);
-
-    // Set connection failure handler and recovery settings
-    // Default settings can be overridden if provided as Configuration
-    // properties by the user
-    if (conf.get(SQLServerDBInputFormat.IMPORT_FAILURE_HANDLER_CLASS)
-        == null) {
-      conf.set(SQLServerDBInputFormat.IMPORT_FAILURE_HANDLER_CLASS,
-        SQLServerConnectionFailureHandler.class.getName());
-    }
-  }
-
-  /**
-   * Configure SQLServer Sqoop export Jobs to recover failed connections by
-   * using SQLServerConnectionFailureHandler by default.
-   */
-  protected void configureConnectionRecoveryForExport(
-      org.apache.sqoop.manager.ExportJobContext context) {
-
-    Configuration conf = context.getOptions().getConf();
-
-    // Set connection failure handler and recovery settings
-    // Default settings can be overridden if provided as Configuration
-    // properties by the user
-    String clsFailureHandler = conf.get(
-      SQLServerResilientExportOutputFormat.EXPORT_FAILURE_HANDLER_CLASS);
-    if (clsFailureHandler == null) {
-      conf.set(
-        SQLServerResilientExportOutputFormat.EXPORT_FAILURE_HANDLER_CLASS,
-        SQLServerConnectionFailureHandler.class.getName());
-    }
-  }
-
-  /**
-   * Configure SQLServer Sqoop Update Jobs to recover connection failures by
-   * using SQLServerConnectionFailureHandler by default.
-   */
-  protected void configureConnectionRecoveryForUpdate(
-      org.apache.sqoop.manager.ExportJobContext context) {
-
-    Configuration conf = context.getOptions().getConf();
-
-    // Set connection failure handler and recovery settings
-    // Default settings can be overridden if provided as Configuration
-    // properties by the user
-    String clsFailureHandler = conf.get(
-      SQLServerResilientExportOutputFormat.EXPORT_FAILURE_HANDLER_CLASS);
-    if (clsFailureHandler == null) {
-      conf.set(
-        SQLServerResilientExportOutputFormat.EXPORT_FAILURE_HANDLER_CLASS,
-        SQLServerConnectionFailureHandler.class.getName());
-    }
-  }
-
-  /**
-   * Check if the user has requested the operation to be non resilient.
-   */
-  protected boolean isNonResilientOperation() {
-    String [] extraArgs = options.getExtraArgs();
-    if (extraArgs != null) {
-      // Traverse the extra options
-      for (int iArg = 0; iArg < extraArgs.length; ++iArg) {
-        String currentArg = extraArgs[iArg];
-        if (currentArg.startsWith("--")
-          && currentArg.substring(2).equalsIgnoreCase(NON_RESILIENT_OPTION)) {
-          // User has explicitly requested the operation to be non-resilient
-          return true;
-        }
-      }
-    }
-    return false;
-  }
 }
 
diff --git a/src/java/org/apache/sqoop/manager/SqlServerManagerContextConfigurator.java b/src/java/org/apache/sqoop/manager/SqlServerManagerContextConfigurator.java
new file mode 100644 (file)
index 0000000..cf58f63
--- /dev/null
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.mapreduce.SQLServerResilientExportOutputFormat;
+import org.apache.sqoop.mapreduce.SQLServerResilientUpdateOutputFormat;
+import org.apache.sqoop.mapreduce.db.SQLServerConnectionFailureHandler;
+import org.apache.sqoop.mapreduce.db.SQLServerDBInputFormat;
+import org.apache.sqoop.mapreduce.sqlserver.SqlServerExportBatchOutputFormat;
+
+public class SqlServerManagerContextConfigurator {
+
+  private static final String RESILIENT_OPTION = "resilient";
+
+  /**
+   * Check if the user has requested the operation to be resilient.
+   */
+  private boolean isResilientOperation(SqoopOptions options) {
+    String [] extraArgs = options.getExtraArgs();
+    if (extraArgs != null) {
+      // Traverse the extra options
+      for (int iArg = 0; iArg < extraArgs.length; ++iArg) {
+        String currentArg = extraArgs[iArg];
+        if (currentArg.startsWith("--")
+          && currentArg.substring(2).equalsIgnoreCase(RESILIENT_OPTION)) {
+          // User has explicitly requested the operation to be resilient
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  public void configureContextForExport(ExportJobContext context) {
+    if (isResilientOperation(context.getOptions())) {
+      context.setOutputFormatClass(SQLServerResilientExportOutputFormat.class);
+      configureConnectionRecoveryForExport(context);
+    } else {
+      context.setOutputFormatClass(SqlServerExportBatchOutputFormat.class);
+    }
+  }
+
+  /**
+   * Configure SQLServer Sqoop export Jobs to recover failed connections by
+   * using {@link SQLServerConnectionFailureHandler}. This can be overridden by setting the
+   * {@link SQLServerResilientExportOutputFormat#EXPORT_FAILURE_HANDLER_CLASS} in the configuration.
+   */
+  private void configureConnectionRecoveryForExport(
+      ExportJobContext context) {
+
+    Configuration conf = context.getOptions().getConf();
+
+    // Set connection failure handler and recovery settings
+    // Can be overridden if provided as Configuration
+    // properties by the user
+    String clsFailureHandler = conf.get(
+      SQLServerResilientExportOutputFormat.EXPORT_FAILURE_HANDLER_CLASS);
+    if (clsFailureHandler == null) {
+      conf.set(
+        SQLServerResilientExportOutputFormat.EXPORT_FAILURE_HANDLER_CLASS,
+        SQLServerConnectionFailureHandler.class.getName());
+    }
+  }
+
+  /**
+   * Configure SQLServer Sqoop Jobs to recover failed connections by using
+   * {@link SQLServerConnectionFailureHandler}. This can be overridden by setting the
+   * {@link SQLServerDBInputFormat#IMPORT_FAILURE_HANDLER_CLASS} in the configuration.
+   */
+  private void configureConnectionRecoveryForImport(
+      ImportJobContext context) {
+
+    Configuration conf = context.getOptions().getConf();
+
+    // Configure input format class
+    context.setInputFormat(SQLServerDBInputFormat.class);
+
+    // Set connection failure handler and recovery settings
+    // Can be overridden if provided as Configuration
+    // properties by the user
+    if (conf.get(SQLServerDBInputFormat.IMPORT_FAILURE_HANDLER_CLASS) == null) {
+      conf.set(SQLServerDBInputFormat.IMPORT_FAILURE_HANDLER_CLASS,
+        SQLServerConnectionFailureHandler.class.getName());
+    }
+  }
+
+  public void configureContextForImport(ImportJobContext context, String splitCol) {
+    if (isResilientOperation(context.getOptions())) {
+      // Enable connection recovery only if split column is provided
+      if (splitCol != null) {
+        // Configure SQLServer table import jobs for connection recovery
+        configureConnectionRecoveryForImport(context);
+      }
+    }
+  }
+
+  /**
+   *
+   * @param context
+   * @param manager
+   * @return whether the job should be executed as an exportjob
+   */
+  public boolean configureContextForUpdate(ExportJobContext context, SQLServerManager manager) {
+    boolean runAsExportJob = isResilientOperation(context.getOptions());
+    if (runAsExportJob) {
+      context.setConnManager(manager);
+      context.setOutputFormatClass(SQLServerResilientUpdateOutputFormat.class);
+      configureConnectionRecoveryForExport(context);
+    }
+    return runAsExportJob;
+  }
+}
index c83c2c9..fc1c489 100644 (file)
 
 package org.apache.sqoop.manager.sqlserver;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.sqoop.ConnFactory;
+import org.apache.sqoop.SqoopOptions;
 import org.apache.sqoop.manager.SQLServerManager;
+import org.apache.sqoop.testutil.ArgumentArrayBuilder;
+import org.apache.sqoop.testutil.ImportJobTestCase;
+import org.apache.sqoop.util.FileListing;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
-import org.apache.sqoop.SqoopOptions;
-import org.apache.sqoop.testutil.CommonArgs;
-import org.apache.sqoop.testutil.ImportJobTestCase;
-import org.apache.sqoop.util.FileListing;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@RunWith(Parameterized.class)
 /**
  * Test the SQLServerManager implementation.
  *
@@ -92,8 +95,7 @@ public class SQLServerManagerImportTest extends ImportJobTestCase {
   static final String DBO_TABLE_NAME = "EMPLOYEES_MSSQL";
   static final String SCHEMA_SCH = "sch";
   static final String SCH_TABLE_NAME = "PRIVATE_TABLE";
-  static final String CONNECT_STRING = HOST_URL
-              + ";databaseName=" + DATABASE_NAME;
+  static final String CONNECT_STRING = HOST_URL + ";databaseName=" + DATABASE_NAME;
 
   static final String CONNECTOR_FACTORY = System.getProperty(
       "sqoop.test.msserver.connector.factory",
@@ -103,8 +105,34 @@ public class SQLServerManagerImportTest extends ImportJobTestCase {
   private SQLServerManager manager;
 
   private Configuration conf = new Configuration();
+
   private Connection conn = null;
 
+  public static final String[] EXPECTED_RESULTS = new String[]{
+      "1,Aaron,1000000.0,engineering",
+      "2,Bob,400.0,sales",
+      "3,Fred,15.0,marketing",
+  };
+
+  @Parameters(name = "Builder: {0}, Table: {1}")
+  public static Iterable<? extends Object> testConfigurations() {
+    ArgumentArrayBuilder builderForTableImportWithExplicitSchema = getArgsBuilderForTableImport().withToolOption("schema", SCHEMA_DBO);
+    return Arrays.asList(
+        new Object[] { getArgsBuilderForQueryImport(), DBO_TABLE_NAME },
+        new Object[] { getArgsBuilderForTableImport(), DBO_TABLE_NAME },
+        new Object[] { getArgsBuilderForDifferentSchemaTableImport(), SCH_TABLE_NAME },
+        new Object[] { builderForTableImportWithExplicitSchema, DBO_TABLE_NAME }
+    );
+  }
+
+  private final ArgumentArrayBuilder builder;
+  private final String tableName;
+
+  public SQLServerManagerImportTest(ArgumentArrayBuilder builder, String tableName) {
+    this.builder = builder;
+    this.tableName = tableName;
+  }
+
   @Override
   protected Configuration getConf() {
     return conf;
@@ -124,8 +152,7 @@ public class SQLServerManagerImportTest extends ImportJobTestCase {
   public void setUp() {
     super.setUp();
 
-    SqoopOptions options = new SqoopOptions(CONNECT_STRING,
-      DBO_TABLE_NAME);
+    SqoopOptions options = new SqoopOptions(CONNECT_STRING, DBO_TABLE_NAME);
     options.setUsername(DATABASE_USER);
     options.setPassword(DATABASE_PASSWORD);
 
@@ -241,96 +268,74 @@ public class SQLServerManagerImportTest extends ImportJobTestCase {
 
   @Test
   public void testImportSimple() throws IOException {
-    String [] expectedResults = {
-      "1,Aaron,1000000.0,engineering",
-      "2,Bob,400.0,sales",
-      "3,Fred,15.0,marketing",
-    };
-
-    doImportAndVerify(DBO_TABLE_NAME, expectedResults);
+    doImportAndVerify(builder, tableName);
   }
 
   @Test
-  public void testImportExplicitDefaultSchema() throws IOException {
-    String [] expectedResults = {
-      "1,Aaron,1000000.0,engineering",
-      "2,Bob,400.0,sales",
-      "3,Fred,15.0,marketing",
-    };
-
-    String[] extraArgs = new String[] {"--schema", SCHEMA_DBO};
-
-    doImportAndVerify(DBO_TABLE_NAME, expectedResults, extraArgs);
+  public void testImportTableHints() throws IOException {
+    builder.withToolOption("table-hints", "NOLOCK");
+    doImportAndVerify(builder, tableName);
   }
 
   @Test
-  public void testImportDifferentSchema() throws IOException {
-    String [] expectedResults = {
-      "1,Aaron,1000000.0,engineering",
-      "2,Bob,400.0,sales",
-      "3,Fred,15.0,marketing",
-    };
-
-    String[] extraArgs = new String[] {"--schema", SCHEMA_SCH};
-
-    doImportAndVerify(SCH_TABLE_NAME, expectedResults, extraArgs);
+  public void testImportTableHintsMultiple() throws IOException {
+    builder.withToolOption("table-hints", "NOLOCK,NOWAIT");
+    doImportAndVerify(builder, tableName);
   }
 
   @Test
-  public void testImportTableHints() throws IOException {
-    String [] expectedResults = {
-      "1,Aaron,1000000.0,engineering",
-      "2,Bob,400.0,sales",
-      "3,Fred,15.0,marketing",
-    };
-
-    String[] extraArgs = new String[] {"--table-hints", "NOLOCK"};
-    doImportAndVerify(DBO_TABLE_NAME, expectedResults, extraArgs);
+  public void testImportTableResilient() throws IOException {
+    builder.withToolOption("resilient");
+    doImportAndVerify(builder, tableName);
   }
 
+  /**
+   * The resilient option was named non-resilient before, but got renamed.
+   * This test is here to ensure backward compatibility in the sense that
+   * using the non-resilient option won't break any job.
+   *
+   * @throws IOException
+   */
   @Test
-  public void testImportTableHintsMultiple() throws IOException {
-    String [] expectedResults = {
-      "1,Aaron,1000000.0,engineering",
-      "2,Bob,400.0,sales",
-      "3,Fred,15.0,marketing",
-    };
+  public void testImportTableNonResilient() throws IOException {
+    builder.withToolOption("non-resilient");
+    doImportAndVerify(builder, tableName);
+  }
 
-    String[] extraArgs = new String[] {"--table-hints", "NOLOCK,NOWAIT"};
-    doImportAndVerify(DBO_TABLE_NAME, expectedResults, extraArgs);
+  private static ArgumentArrayBuilder getArgsBuilder() {
+    ArgumentArrayBuilder builder = new ArgumentArrayBuilder();
+    return builder.withCommonHadoopFlags(true)
+        .withOption("connect", CONNECT_STRING)
+        .withOption("username", DATABASE_USER)
+        .withOption("password", DATABASE_PASSWORD)
+        .withOption("num-mappers",  "1")
+        .withOption("split-by", "id");
   }
 
-  private String [] getArgv(String tableName, String ... extraArgs) {
-    ArrayList<String> args = new ArrayList<String>();
-
-    CommonArgs.addHadoopFlags(args);
-
-    args.add("--table");
-    args.add(tableName);
-    args.add("--warehouse-dir");
-    args.add(getWarehouseDir());
-    args.add("--connect");
-    args.add(CONNECT_STRING);
-    args.add("--username");
-    args.add(DATABASE_USER);
-    args.add("--password");
-    args.add(DATABASE_PASSWORD);
-    args.add("--num-mappers");
-    args.add("1");
-
-    if (extraArgs.length > 0) {
-      args.add("--");
-      for (String arg : extraArgs) {
-        args.add(arg);
-      }
-    }
+  private static ArgumentArrayBuilder getArgsBuilderForTableImport() {
+    ArgumentArrayBuilder builder = getArgsBuilder();
+    return builder.withCommonHadoopFlags(true)
+        .withOption("warehouse-dir", LOCAL_WAREHOUSE_DIR)
+        .withOption("table", DBO_TABLE_NAME);
+  }
+
+  private static ArgumentArrayBuilder getArgsBuilderForQueryImport() {
+    ArgumentArrayBuilder builder = getArgsBuilder();
+    return builder.withCommonHadoopFlags(true)
+        .withOption("query", "SELECT * FROM EMPLOYEES_MSSQL WHERE $CONDITIONS")
+        .withOption("target-dir", LOCAL_WAREHOUSE_DIR + "/" + DBO_TABLE_NAME);
+  }
 
-    return args.toArray(new String[0]);
+  private static ArgumentArrayBuilder getArgsBuilderForDifferentSchemaTableImport() {
+    ArgumentArrayBuilder builder = getArgsBuilder();
+    return builder.withCommonHadoopFlags(true)
+        .withOption("warehouse-dir", LOCAL_WAREHOUSE_DIR)
+        .withOption("table", SCH_TABLE_NAME)
+        .withToolOption("schema", SCHEMA_SCH);
   }
 
-  private void doImportAndVerify(String tableName,
-                                 String [] expectedResults,
-                                 String ... extraArgs) throws IOException {
+  private void doImportAndVerify(ArgumentArrayBuilder argBuilder,
+                                 String tableName) throws IOException {
 
     Path warehousePath = new Path(this.getWarehouseDir());
     Path tablePath = new Path(warehousePath, tableName);
@@ -342,7 +347,7 @@ public class SQLServerManagerImportTest extends ImportJobTestCase {
       FileListing.recursiveDeleteDir(tableFile);
     }
 
-    String [] argv = getArgv(tableName, extraArgs);
+    String [] argv = argBuilder.build();
     try {
       runImport(argv);
     } catch (IOException ioe) {
@@ -357,7 +362,7 @@ public class SQLServerManagerImportTest extends ImportJobTestCase {
     try {
       // Read through the file and make sure it's all there.
       r = new BufferedReader(new InputStreamReader(new FileInputStream(f)));
-      for (String expectedLine : expectedResults) {
+      for (String expectedLine : EXPECTED_RESULTS) {
         assertEquals(expectedLine, r.readLine());
       }
     } catch (IOException ioe) {
diff --git a/src/test/org/apache/sqoop/manager/sqlserver/TestSqlServerManagerContextConfigurator.java b/src/test/org/apache/sqoop/manager/sqlserver/TestSqlServerManagerContextConfigurator.java
new file mode 100644 (file)
index 0000000..c0d0a24
--- /dev/null
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager.sqlserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.manager.ExportJobContext;
+import org.apache.sqoop.manager.ImportJobContext;
+import org.apache.sqoop.manager.SqlServerManagerContextConfigurator;
+import org.apache.sqoop.mapreduce.SQLServerResilientExportOutputFormat;
+import org.apache.sqoop.mapreduce.SQLServerResilientUpdateOutputFormat;
+import org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+import org.apache.sqoop.mapreduce.db.SQLServerDBInputFormat;
+import org.apache.sqoop.mapreduce.sqlserver.SqlServerExportBatchOutputFormat;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test methods in the configuration utilities.
+ */
+public class TestSqlServerManagerContextConfigurator {
+
+  public static final Log LOG = LogFactory.getLog(TestSqlServerManagerContextConfigurator.class.getName());
+
+  private final SqlServerManagerContextConfigurator formatConfigurator = new SqlServerManagerContextConfigurator();
+
+  private SqoopOptions options;
+
+  @Test
+  public void testResilientImportContextConfiguration() {
+    String[] extraArgs = {"--resilient"};
+    options.setExtraArgs(extraArgs);
+
+    ImportJobContext context = new ImportJobContext("TABLE_NAME", "example.jar", options, null);
+    formatConfigurator.configureContextForImport(context, "id");
+    Class<? extends InputFormat> inputFormat = context.getInputFormat();
+    assertThat(inputFormat).isSameAs(SQLServerDBInputFormat.class);
+  }
+
+  @Test
+  public void testNonResilientImportContextConfiguration() {
+    String[] extraArgs = {"--non-resilient"};
+    options.setExtraArgs(extraArgs);
+
+    ImportJobContext context = new ImportJobContext("TABLE_NAME", "example.jar", options, null);
+    formatConfigurator.configureContextForImport(context, "id");
+    Class<? extends InputFormat> inputFormat = context.getInputFormat();
+    assertThat(inputFormat).isSameAs(DataDrivenDBInputFormat.class);
+  }
+
+  @Test
+  public void testResilientExportContextConfiguration() {
+    String[] extraArgs = {"--resilient"};
+    options.setExtraArgs(extraArgs);
+
+    ExportJobContext context = new ExportJobContext("TABLE_NAME", "example.jar", options);
+    formatConfigurator.configureContextForExport(context);
+    Class outputFormatClass = context.getOutputFormatClass();
+    assertThat(outputFormatClass).isSameAs(SQLServerResilientExportOutputFormat.class);
+  }
+
+  @Test
+  public void testNonResilientExportContextConfiguration() {
+    String[] extraArgs = {"--non-resilient"};
+    options.setExtraArgs(extraArgs);
+
+    ExportJobContext context = new ExportJobContext("TABLE_NAME", "example.jar", options);
+    formatConfigurator.configureContextForExport(context);
+    Class outputFormatClass = context.getOutputFormatClass();
+    assertThat(outputFormatClass).isSameAs(SqlServerExportBatchOutputFormat.class);
+  }
+
+  @Test
+  public void testResilientUpdateContextConfiguration() {
+    String[] extraArgs = {"--resilient"};
+    options.setExtraArgs(extraArgs);
+
+    ExportJobContext context = new ExportJobContext("TABLE_NAME", "example.jar", options);
+    formatConfigurator.configureContextForUpdate(context, null);
+    Class outputFormatClass = context.getOutputFormatClass();
+    assertThat(outputFormatClass).isSameAs(SQLServerResilientUpdateOutputFormat.class);
+  }
+
+  @Test
+  public void testNonResilientUpdateContextConfiguration() {
+    String[] extraArgs = {"--non-resilient"};
+    options.setExtraArgs(extraArgs);
+
+    ExportJobContext context = new ExportJobContext("TABLE_NAME", "example.jar", options);
+    formatConfigurator.configureContextForUpdate(context, null);
+    Class outputFormatClass = context.getOutputFormatClass();
+    assertThat(outputFormatClass).isNull();
+  }
+
+  @Before
+  public void setUp() {
+    Configuration conf = new Configuration();
+    this.options = new SqoopOptions(conf);
+  }
+}