SQOOP-931: Integrate HCatalog with Sqoop
[sqoop.git] / src / java / org / apache / sqoop / mapreduce / ImportJobBase.java
index 2465f3f..ab7f21e 100644 (file)
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
 import org.apache.sqoop.util.PerfCounters;
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.config.ConfigurationHelper;
@@ -92,6 +93,13 @@ public class ImportJobBase extends JobBase {
 
     job.setOutputFormatClass(getOutputFormatClass());
 
+    if (isHCatJob) {
+      LOG.debug("Configuring output format for HCatalog  import job");
+      SqoopHCatUtilities.configureImportOutputFormat(options, job,
+        getContext().getConnManager(), tableName, job.getConfiguration());
+      return;
+    }
+
     if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
       job.getConfiguration().set("mapred.output.value.class", tableClassName);
     }
@@ -149,6 +157,11 @@ public class ImportJobBase extends JobBase {
     perfCounters.startClock();
 
     boolean success = doSubmitJob(job);
+
+    if (isHCatJob) {
+      SqoopHCatUtilities.instance().invokeOutputCommitterForLocalMode(job);
+    }
+
     perfCounters.stopClock();
 
     Counters jobCounters = job.getCounters();