SQOOP-3319: Extract code using Kite into separate classes
authorBoglarka Egyed <bogi@apache.org>
Tue, 29 May 2018 08:17:25 +0000 (10:17 +0200)
committerBoglarka Egyed <bogi@apache.org>
Tue, 29 May 2018 08:17:25 +0000 (10:17 +0200)
(Szabolcs Vasas via Boglarka Egyed)

43 files changed:
src/java/org/apache/sqoop/avro/AvroUtil.java
src/java/org/apache/sqoop/manager/ConnManager.java
src/java/org/apache/sqoop/manager/CubridManager.java
src/java/org/apache/sqoop/manager/Db2Manager.java
src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
src/java/org/apache/sqoop/manager/MainframeManager.java
src/java/org/apache/sqoop/manager/MySQLManager.java
src/java/org/apache/sqoop/manager/OracleManager.java
src/java/org/apache/sqoop/manager/SQLServerManager.java
src/java/org/apache/sqoop/manager/SqlManager.java
src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java
src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java
src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
src/java/org/apache/sqoop/mapreduce/MergeJob.java
src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java
src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
src/java/org/apache/sqoop/mapreduce/parquet/ParquetConstants.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/parquet/ParquetExportJobConfigurator.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactory.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactoryProvider.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/parquet/ParquetMergeJobConfigurator.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java [moved from src/java/org/apache/sqoop/mapreduce/ParquetExportMapper.java with 73% similarity]
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java [moved from src/java/org/apache/sqoop/mapreduce/ParquetJob.java with 93% similarity]
src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java
src/java/org/apache/sqoop/tool/BaseSqoopTool.java
src/java/org/apache/sqoop/tool/ImportTool.java
src/java/org/apache/sqoop/tool/MergeTool.java
src/test/org/apache/sqoop/TestParquetImport.java
src/test/org/apache/sqoop/hive/TestHiveImport.java
src/test/org/apache/sqoop/mapreduce/TestJdbcExportJob.java
src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java

index 603cc63..57c2062 100644 (file)
@@ -340,4 +340,8 @@ public final class AvroUtil {
 
     return LogicalTypes.decimal(precision, scale);
   }
+
+  public static Schema parseAvroSchema(String schemaString) {
+    return new Schema.Parser().parse(schemaString);
+  }
 }
index d7d6279..c80dd5d 100644 (file)
@@ -45,6 +45,8 @@ import org.apache.sqoop.SqoopOptions;
 import org.apache.sqoop.hive.HiveTypes;
 import org.apache.sqoop.lib.BlobRef;
 import org.apache.sqoop.lib.ClobRef;
+import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
+import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactoryProvider;
 import org.apache.sqoop.util.ExportException;
 import org.apache.sqoop.util.ImportException;
 
@@ -866,5 +868,8 @@ public abstract class ConnManager {
     return false;
   }
 
+  public ParquetJobConfiguratorFactory getParquetJobConfigurator() {
+    return ParquetJobConfiguratorFactoryProvider.createParquetJobConfiguratorFactory(options.getConf());
+  }
 }
 
index e27f616..a75268f 100644 (file)
@@ -65,7 +65,7 @@ public class CubridManager extends
       throws IOException, ExportException {
     context.setConnManager(this);
     JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
-        ExportBatchOutputFormat.class);
+        ExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
 
     exportJob.runExport();
   }
@@ -80,7 +80,7 @@ public class CubridManager extends
     context.setConnManager(this);
 
     JdbcUpsertExportJob exportJob = new JdbcUpsertExportJob(context,
-        CubridUpsertOutputFormat.class);
+        CubridUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 
index 7ff68ce..c78946e 100644 (file)
@@ -111,7 +111,7 @@ public class Db2Manager
       throws IOException, ExportException {
     context.setConnManager(this);
     JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
-      ExportBatchOutputFormat.class);
+      ExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 
index c05e1c1..70b9b43 100644 (file)
@@ -585,7 +585,8 @@ public class DirectPostgresqlManager
       new PostgreSQLCopyExportJob(context,
                                   null,
                                   ExportInputFormat.class,
-                                  NullOutputFormat.class);
+                                  NullOutputFormat.class,
+                                  getParquetJobConfigurator().createParquetExportJobConfigurator());
     job.runExport();
   }
 }
index a6002ef..4e8be15 100644 (file)
@@ -90,7 +90,7 @@ public class MainframeManager extends org.apache.sqoop.manager.ConnManager {
       importer = new AccumuloImportJob(opts, context);
     } else {
       // Import to HDFS.
-      importer = new MainframeImportJob(opts, context);
+      importer = new MainframeImportJob(opts, context, getParquetJobConfigurator().createParquetImportJobConfigurator());
     }
 
     importer.setInputFormatClass(MainframeDatasetInputFormat.class);
index 2d17707..992c461 100644 (file)
@@ -138,7 +138,7 @@ public class MySQLManager
     LOG.warn("documentation for additional limitations.");
 
     JdbcUpsertExportJob exportJob =
-      new JdbcUpsertExportJob(context, MySQLUpsertOutputFormat.class);
+      new JdbcUpsertExportJob(context, MySQLUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 
index b7005d4..cdc6c54 100644 (file)
@@ -462,7 +462,7 @@ public class OracleManager
       throws IOException, ExportException {
     context.setConnManager(this);
     JdbcExportJob exportJob = new JdbcExportJob(context,
-            null, null, ExportBatchOutputFormat.class);
+            null, null, ExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 
@@ -474,7 +474,7 @@ public class OracleManager
       throws IOException, ExportException {
     context.setConnManager(this);
     JdbcUpsertExportJob exportJob =
-      new JdbcUpsertExportJob(context, OracleUpsertOutputFormat.class);
+      new JdbcUpsertExportJob(context, OracleUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 
index d57a493..b136087 100644 (file)
@@ -181,10 +181,10 @@ public class SQLServerManager
     JdbcExportJob exportJob;
     if (isNonResilientOperation()) {
       exportJob = new JdbcExportJob(context, null, null,
-      SqlServerExportBatchOutputFormat.class);
+      SqlServerExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
     } else {
       exportJob = new JdbcExportJob(context, null, null,
-        SQLServerResilientExportOutputFormat.class);
+        SQLServerResilientExportOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
       configureConnectionRecoveryForExport(context);
     }
     exportJob.runExport();
@@ -202,7 +202,7 @@ public class SQLServerManager
     } else {
       context.setConnManager(this);
       JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context, null,
-        null, SQLServerResilientUpdateOutputFormat.class);
+        null, SQLServerResilientUpdateOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
       configureConnectionRecoveryForUpdate(context);
       exportJob.runExport();
     }
@@ -223,7 +223,7 @@ public class SQLServerManager
     }
 
     JdbcUpsertExportJob exportJob =
-        new JdbcUpsertExportJob(context, SqlServerUpsertOutputFormat.class);
+        new JdbcUpsertExportJob(context, SqlServerUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 
index 4572098..d82332a 100644 (file)
@@ -682,7 +682,7 @@ public abstract class SqlManager
     } else {
       // Import to HDFS.
       importer = new DataDrivenImportJob(opts, context.getInputFormat(),
-              context);
+              context, getParquetJobConfigurator().createParquetImportJobConfigurator());
     }
 
     checkTableImportOptions(context);
@@ -725,7 +725,7 @@ public abstract class SqlManager
     } else {
       // Import to HDFS.
       importer = new DataDrivenImportJob(opts, context.getInputFormat(),
-          context);
+          context, getParquetJobConfigurator().createParquetImportJobConfigurator());
     }
 
     String splitCol = getSplitColumn(opts, null);
@@ -926,7 +926,7 @@ public abstract class SqlManager
   public void exportTable(org.apache.sqoop.manager.ExportJobContext context)
       throws IOException, ExportException {
     context.setConnManager(this);
-    JdbcExportJob exportJob = new JdbcExportJob(context);
+    JdbcExportJob exportJob = new JdbcExportJob(context, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 
@@ -935,7 +935,7 @@ public abstract class SqlManager
       throws IOException,
       ExportException {
     context.setConnManager(this);
-    JdbcCallExportJob exportJob = new JdbcCallExportJob(context);
+    JdbcCallExportJob exportJob = new JdbcCallExportJob(context, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 
@@ -960,7 +960,7 @@ public abstract class SqlManager
       org.apache.sqoop.manager.ExportJobContext context)
       throws IOException, ExportException {
     context.setConnManager(this);
-    JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context);
+    JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 
index 10524e3..95eaacf 100644 (file)
@@ -321,7 +321,7 @@ public class OraOopConnManager extends GenericJdbcManager {
       throw ex;
     }
     JdbcExportJob exportJob =
-        new JdbcExportJob(context, null, null, oraOopOutputFormatClass);
+        new JdbcExportJob(context, null, null, oraOopOutputFormatClass, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 
@@ -343,7 +343,7 @@ public class OraOopConnManager extends GenericJdbcManager {
     }
 
     JdbcUpdateExportJob exportJob =
-        new JdbcUpdateExportJob(context, null, null, oraOopOutputFormatClass);
+        new JdbcUpdateExportJob(context, null, null, oraOopOutputFormatClass, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 
index a5962ba..3b54210 100644 (file)
@@ -26,8 +26,6 @@ import org.apache.avro.Schema;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -48,10 +46,8 @@ import org.apache.sqoop.manager.ImportJobContext;
 import org.apache.sqoop.mapreduce.ImportJobBase;
 import org.apache.sqoop.mapreduce.db.DBConfiguration;
 import org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
 import org.apache.sqoop.orm.AvroSchemaGenerator;
-import org.apache.sqoop.util.FileSystemUtil;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
 
 /**
  * Actually runs a jdbc import job using the ORM files generated by the
@@ -62,15 +58,24 @@ public class DataDrivenImportJob extends ImportJobBase {
   public static final Log LOG = LogFactory.getLog(
       DataDrivenImportJob.class.getName());
 
-  @SuppressWarnings("unchecked")
-  public DataDrivenImportJob(final SqoopOptions opts) {
-    super(opts, null, DataDrivenDBInputFormat.class, null, null);
+  private final ParquetImportJobConfigurator parquetImportJobConfigurator;
+
+  public DataDrivenImportJob(final SqoopOptions opts,
+      final Class<? extends InputFormat> inputFormatClass,
+      ImportJobContext context, ParquetImportJobConfigurator parquetImportJobConfigurator) {
+    super(opts, null, inputFormatClass, null, context);
+    this.parquetImportJobConfigurator = parquetImportJobConfigurator;
   }
 
   public DataDrivenImportJob(final SqoopOptions opts,
       final Class<? extends InputFormat> inputFormatClass,
       ImportJobContext context) {
-    super(opts, null, inputFormatClass, null, context);
+    this(opts, inputFormatClass, context, null);
+  }
+
+  @SuppressWarnings("unchecked")
+  public DataDrivenImportJob(final SqoopOptions opts) {
+    this(opts, DataDrivenDBInputFormat.class, null);
   }
 
   @Override
@@ -101,53 +106,20 @@ public class DataDrivenImportJob extends ImportJobBase {
       AvroJob.setMapOutputSchema(job.getConfiguration(), schema);
     } else if (options.getFileLayout()
         == SqoopOptions.FileLayout.ParquetFile) {
-      JobConf conf = (JobConf)job.getConfiguration();
       // Kite SDK requires an Avro schema to represent the data structure of
       // target dataset. If the schema name equals to generated java class name,
       // the import will fail. So we use table name as schema name and add a
       // prefix "codegen_" to generated java class to avoid the conflict.
       final String schemaNameOverride = tableName;
       Schema schema = generateAvroSchema(tableName, schemaNameOverride);
-      String uri = getKiteUri(conf, tableName);
-      ParquetJob.WriteMode writeMode;
-
-      if (options.doHiveImport()) {
-        if (options.doOverwriteHiveTable()) {
-          writeMode = ParquetJob.WriteMode.OVERWRITE;
-        } else {
-          writeMode = ParquetJob.WriteMode.APPEND;
-          if (Datasets.exists(uri)) {
-            LOG.warn("Target Hive table '" + tableName + "' exists! Sqoop will " +
-                "append data into the existing Hive table. Consider using " +
-                "--hive-overwrite, if you do NOT intend to do appending.");
-          }
-        }
-      } else {
-        // Note that there is no such an import argument for overwriting HDFS
-        // dataset, so overwrite mode is not supported yet.
-        // Sqoop's append mode means to merge two independent datasets. We
-        // choose DEFAULT as write mode.
-        writeMode = ParquetJob.WriteMode.DEFAULT;
-      }
-      ParquetJob.configureImportJob(conf, schema, uri, writeMode);
+      Path destination = getContext().getDestination();
+
+      parquetImportJobConfigurator.configureMapper(job, schema, options, tableName, destination);
     }
 
     job.setMapperClass(getMapperClass());
   }
 
-  private String getKiteUri(Configuration conf, String tableName) throws IOException {
-    if (options.doHiveImport()) {
-      String hiveDatabase = options.getHiveDatabaseName() == null ? "default" :
-          options.getHiveDatabaseName();
-      String hiveTable = options.getHiveTableName() == null ? tableName :
-          options.getHiveTableName();
-      return String.format("dataset:hive:/%s/%s", hiveDatabase, hiveTable);
-    } else {
-      Path destination = getContext().getDestination();
-      return "dataset:" + FileSystemUtil.makeQualified(destination, conf);
-    }
-  }
-
   private Schema generateAvroSchema(String tableName,
       String schemaNameOverride) throws IOException {
     ConnManager connManager = getContext().getConnManager();
@@ -187,7 +159,7 @@ public class DataDrivenImportJob extends ImportJobBase {
       return AvroImportMapper.class;
     } else if (options.getFileLayout()
         == SqoopOptions.FileLayout.ParquetFile) {
-      return ParquetImportMapper.class;
+      return parquetImportJobConfigurator.getMapperClass();
     }
 
     return null;
@@ -210,7 +182,7 @@ public class DataDrivenImportJob extends ImportJobBase {
       return AvroOutputFormat.class;
     } else if (options.getFileLayout()
         == SqoopOptions.FileLayout.ParquetFile) {
-      return DatasetKeyOutputFormat.class;
+      return parquetImportJobConfigurator.getOutputFormatClass();
     }
 
     return null;
index fb5d054..17c9ed3 100644 (file)
@@ -49,6 +49,8 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Date;
 
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_OUTPUT_CODEC_KEY;
+
 /**
  * Base class for running an import MapReduce job.
  * Allows dependency injection, etc, for easy customization of import job types.
@@ -149,7 +151,7 @@ public class ImportJobBase extends JobBase {
           Configuration conf = job.getConfiguration();
           String shortName = CodecMap.getCodecShortNameByName(codecName, conf);
           if (!shortName.equalsIgnoreCase("default")) {
-            conf.set(ParquetJob.CONF_OUTPUT_CODEC, shortName);
+            conf.set(SQOOP_PARQUET_OUTPUT_CODEC_KEY, shortName);
           }
         }
       }
index b7eea93..be82aed 100644 (file)
@@ -32,6 +32,7 @@ import org.apache.sqoop.mapreduce.db.DBOutputFormat;
 import org.apache.sqoop.manager.ConnManager;
 import org.apache.sqoop.manager.ExportJobContext;
 import com.google.common.base.Strings;
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
 
 /**
  * Run an export using JDBC (JDBC-based ExportCallOutputFormat) to
@@ -43,15 +44,16 @@ public class JdbcCallExportJob extends JdbcExportJob {
   public static final Log LOG = LogFactory.getLog(
       JdbcCallExportJob.class.getName());
 
-  public JdbcCallExportJob(final ExportJobContext context) {
-    super(context, null, null, ExportCallOutputFormat.class);
+  public JdbcCallExportJob(final ExportJobContext context, final ParquetExportJobConfigurator parquetExportJobConfigurator) {
+    super(context, null, null, ExportCallOutputFormat.class, parquetExportJobConfigurator);
   }
 
   public JdbcCallExportJob(final ExportJobContext ctxt,
       final Class<? extends Mapper> mapperClass,
       final Class<? extends InputFormat> inputFormatClass,
-      final Class<? extends OutputFormat> outputFormatClass) {
-    super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+      final Class<? extends OutputFormat> outputFormatClass,
+      final ParquetExportJobConfigurator parquetExportJobConfigurator) {
+    super(ctxt, mapperClass, inputFormatClass, outputFormatClass, parquetExportJobConfigurator);
   }
 
   /**
index 3719836..e283548 100644 (file)
@@ -32,11 +32,10 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
-import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
 
 import java.io.IOException;
 import java.util.Map;
-import org.apache.sqoop.util.FileSystemUtil;
 
 /**
  * Run an export using JDBC (JDBC-based ExportOutputFormat).
@@ -45,18 +44,23 @@ public class JdbcExportJob extends ExportJobBase {
 
   private FileType fileType;
 
+  private ParquetExportJobConfigurator parquetExportJobConfigurator;
+
   public static final Log LOG = LogFactory.getLog(
       JdbcExportJob.class.getName());
 
-  public JdbcExportJob(final ExportJobContext context) {
+  public JdbcExportJob(final ExportJobContext context, final ParquetExportJobConfigurator parquetExportJobConfigurator) {
     super(context);
+    this.parquetExportJobConfigurator = parquetExportJobConfigurator;
   }
 
   public JdbcExportJob(final ExportJobContext ctxt,
       final Class<? extends Mapper> mapperClass,
       final Class<? extends InputFormat> inputFormatClass,
-      final Class<? extends OutputFormat> outputFormatClass) {
+      final Class<? extends OutputFormat> outputFormatClass,
+      final ParquetExportJobConfigurator parquetExportJobConfigurator) {
     super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+    this.parquetExportJobConfigurator = parquetExportJobConfigurator;
   }
 
   @Override
@@ -78,8 +82,7 @@ public class JdbcExportJob extends ExportJobBase {
     } else if (fileType == FileType.PARQUET_FILE) {
       LOG.debug("Configuring for Parquet export");
       configureGenericRecordExportInputFormat(job, tableName);
-      String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration());
-      DatasetKeyInputFormat.configure(job).readFrom(uri);
+      parquetExportJobConfigurator.configureInputFormat(job, getInputPath());
     }
   }
 
@@ -120,7 +123,7 @@ public class JdbcExportJob extends ExportJobBase {
       case AVRO_DATA_FILE:
         return AvroInputFormat.class;
       case PARQUET_FILE:
-        return DatasetKeyInputFormat.class;
+        return parquetExportJobConfigurator.getInputFormatClass();
       default:
         return super.getInputFormatClass();
     }
@@ -137,7 +140,7 @@ public class JdbcExportJob extends ExportJobBase {
       case AVRO_DATA_FILE:
         return AvroExportMapper.class;
       case PARQUET_FILE:
-        return ParquetExportMapper.class;
+        return parquetExportJobConfigurator.getMapperClass();
       case UNKNOWN:
       default:
         return TextExportMapper.class;
index 86069c4..f901d37 100644 (file)
@@ -33,15 +33,13 @@ import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.sqoop.mapreduce.ExportJobBase.FileType;
 import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
-import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
 
 import org.apache.sqoop.manager.ConnManager;
 import org.apache.sqoop.manager.ExportJobContext;
 import org.apache.sqoop.mapreduce.db.DBConfiguration;
 import org.apache.sqoop.mapreduce.db.DBOutputFormat;
-import org.apache.sqoop.util.FileSystemUtil;
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
 
 /**
  * Run an update-based export using JDBC (JDBC-based UpdateOutputFormat).
@@ -53,6 +51,8 @@ public class JdbcUpdateExportJob extends ExportJobBase {
   public static final Log LOG = LogFactory.getLog(
       JdbcUpdateExportJob.class.getName());
 
+  private ParquetExportJobConfigurator parquetExportJobConfigurator;
+
   /**
    * Return an instance of the UpdateOutputFormat class object loaded
    * from the shim jar.
@@ -62,16 +62,19 @@ public class JdbcUpdateExportJob extends ExportJobBase {
     return UpdateOutputFormat.class;
   }
 
-  public JdbcUpdateExportJob(final ExportJobContext context)
+  public JdbcUpdateExportJob(final ExportJobContext context, final ParquetExportJobConfigurator parquetExportJobConfigurator)
       throws IOException {
     super(context, null, null, getUpdateOutputFormat());
+    this.parquetExportJobConfigurator = parquetExportJobConfigurator;
   }
 
   public JdbcUpdateExportJob(final ExportJobContext ctxt,
       final Class<? extends Mapper> mapperClass,
       final Class<? extends InputFormat> inputFormatClass,
-      final Class<? extends OutputFormat> outputFormatClass) {
+      final Class<? extends OutputFormat> outputFormatClass,
+      final ParquetExportJobConfigurator parquetExportJobConfigurator) {
     super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+    this.parquetExportJobConfigurator = parquetExportJobConfigurator;
   }
 
   // Fix For Issue [SQOOP-2846]
@@ -86,7 +89,7 @@ public class JdbcUpdateExportJob extends ExportJobBase {
     case AVRO_DATA_FILE:
       return AvroExportMapper.class;
     case PARQUET_FILE:
-      return ParquetExportMapper.class;
+      return parquetExportJobConfigurator.getMapperClass();
     case UNKNOWN:
     default:
       return TextExportMapper.class;
@@ -186,8 +189,7 @@ public class JdbcUpdateExportJob extends ExportJobBase {
     } else if (fileType == FileType.PARQUET_FILE) {
       LOG.debug("Configuring for Parquet export");
       configureGenericRecordExportInputFormat(job, tableName);
-      String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration());
-      DatasetKeyInputFormat.configure(job).readFrom(uri);
+      parquetExportJobConfigurator.configureInputFormat(job, getInputPath());
     }
   }
 
@@ -222,7 +224,7 @@ public class JdbcUpdateExportJob extends ExportJobBase {
     case AVRO_DATA_FILE:
       return AvroInputFormat.class;
     case PARQUET_FILE:
-      return DatasetKeyInputFormat.class;
+      return parquetExportJobConfigurator.getInputFormatClass();
     default:
       return super.getInputFormatClass();
     }
index 9a8c17a..4db86da 100644 (file)
@@ -30,6 +30,7 @@ import org.apache.sqoop.manager.ConnManager;
 import org.apache.sqoop.manager.ExportJobContext;
 import org.apache.sqoop.mapreduce.db.DBConfiguration;
 import org.apache.sqoop.mapreduce.db.DBOutputFormat;
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
 
 /**
  * Run an update/insert export using JDBC (JDBC-based UpsertOutputFormat).
@@ -40,9 +41,10 @@ public class JdbcUpsertExportJob extends JdbcUpdateExportJob {
       JdbcUpsertExportJob.class.getName());
 
   public JdbcUpsertExportJob(final ExportJobContext context,
-      final Class<? extends OutputFormat> outputFormatClass)
+                             final Class<? extends OutputFormat> outputFormatClass,
+                             final ParquetExportJobConfigurator parquetExportJobConfigurator)
       throws IOException {
-    super(context, null, null, outputFormatClass);
+    super(context, null, null, outputFormatClass, parquetExportJobConfigurator);
   }
 
   @Override
index bb21b64..c26a090 100644 (file)
 package org.apache.sqoop.mapreduce;
 
 import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.FileReader;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.FsInput;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -44,17 +38,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.sqoop.avro.AvroUtil;
 import org.apache.sqoop.mapreduce.ExportJobBase.FileType;
+import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
 import org.apache.sqoop.util.Jars;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.Formats;
-import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
-import parquet.avro.AvroParquetInputFormat;
-import parquet.avro.AvroSchemaConverter;
-import parquet.hadoop.Footer;
-import parquet.hadoop.ParquetFileReader;
-import parquet.schema.MessageType;
 
 import org.apache.sqoop.SqoopOptions;
 import org.apache.sqoop.mapreduce.JobBase;
@@ -79,10 +64,11 @@ public class MergeJob extends JobBase {
    */
   public static final String MERGE_SQOOP_RECORD_KEY = "sqoop.merge.class";
 
-  public static final String PARQUET_AVRO_SCHEMA = "parquetjob.avro.schema";
+  private final ParquetMergeJobConfigurator parquetMergeJobConfigurator;
 
-  public MergeJob(final SqoopOptions opts) {
+  public MergeJob(final SqoopOptions opts, final ParquetMergeJobConfigurator parquetMergeJobConfigurator) {
     super(opts, null, null, null);
+    this.parquetMergeJobConfigurator = parquetMergeJobConfigurator;
   }
 
   public boolean runMergeJob() throws IOException {
@@ -147,7 +133,7 @@ public class MergeJob extends JobBase {
         case PARQUET_FILE:
           Path finalPath = new Path(options.getTargetDir());
           finalPath = FileSystemUtil.makeQualified(finalPath, jobConf);
-          configueParquetMergeJob(jobConf, job, oldPath, newPath, finalPath);
+          parquetMergeJobConfigurator.configureParquetMergeJob(jobConf, job, oldPath, newPath, finalPath);
           break;
         case AVRO_DATA_FILE:
           configueAvroMergeJob(conf, job, oldPath, newPath);
@@ -198,51 +184,6 @@ public class MergeJob extends JobBase {
     job.setReducerClass(MergeAvroReducer.class);
     AvroJob.setOutputSchema(job.getConfiguration(), oldPathSchema);
   }
-
-  private void configueParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath,
-      Path finalPath) throws IOException {
-    try {
-      FileSystem fileSystem = finalPath.getFileSystem(conf);
-      LOG.info("Trying to merge parquet files");
-      job.setOutputKeyClass(org.apache.avro.generic.GenericRecord.class);
-      job.setMapperClass(MergeParquetMapper.class);
-      job.setReducerClass(MergeParquetReducer.class);
-      job.setOutputValueClass(NullWritable.class);
-
-      List<Footer> footers = new ArrayList<Footer>();
-      FileStatus oldPathfileStatus = fileSystem.getFileStatus(oldPath);
-      FileStatus newPathfileStatus = fileSystem.getFileStatus(oldPath);
-      footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), oldPathfileStatus, true));
-      footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), newPathfileStatus, true));
-
-      MessageType schema = footers.get(0).getParquetMetadata().getFileMetaData().getSchema();
-      AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
-      Schema avroSchema = avroSchemaConverter.convert(schema);
-
-      if (!fileSystem.exists(finalPath)) {
-        Dataset dataset = createDataset(avroSchema, "dataset:" + finalPath);
-        DatasetKeyOutputFormat.configure(job).overwrite(dataset);
-      } else {
-        DatasetKeyOutputFormat.configure(job).overwrite(new URI("dataset:" + finalPath));
-      }
-
-      job.setInputFormatClass(AvroParquetInputFormat.class);
-      AvroParquetInputFormat.setAvroReadSchema(job, avroSchema);
-
-      conf.set(PARQUET_AVRO_SCHEMA, avroSchema.toString());
-      Class<DatasetKeyOutputFormat> outClass = DatasetKeyOutputFormat.class;
-
-      job.setOutputFormatClass(outClass);
-    } catch (Exception cnfe) {
-      throw new IOException(cnfe);
-    }
-  }
-
-  public static Dataset createDataset(Schema schema, String uri) {
-    DatasetDescriptor descriptor =
-        new DatasetDescriptor.Builder().schema(schema).format(Formats.PARQUET).build();
-    return Datasets.create(uri, descriptor, GenericRecord.class);
-  }
 }
 
 
index caa4f5f..5939b01 100644 (file)
@@ -27,16 +27,16 @@ import java.util.Map;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.sqoop.avro.AvroUtil;
 
 import org.apache.sqoop.lib.SqoopRecord;
 
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
 
-public class MergeParquetReducer extends Reducer<Text, MergeRecord,GenericRecord,NullWritable> {
+
+public abstract class MergeParquetReducer<KEYOUT, VALUEOUT> extends Reducer<Text, MergeRecord, KEYOUT, VALUEOUT> {
 
   private Schema schema = null;
   private boolean bigDecimalFormatString = true;
@@ -44,7 +44,7 @@ public class MergeParquetReducer extends Reducer<Text, MergeRecord,GenericRecord
 
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
-      schema = new Schema.Parser().parse(context.getConfiguration().get("parquetjob.avro.schema"));
+      schema = new Schema.Parser().parse(context.getConfiguration().get(SQOOP_PARQUET_AVRO_SCHEMA_KEY));
       bigDecimalFormatString = context.getConfiguration().getBoolean(
           ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT, ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
     }
@@ -67,9 +67,12 @@ public class MergeParquetReducer extends Reducer<Text, MergeRecord,GenericRecord
       }
 
       if (null != bestRecord) {
-        GenericRecord outKey = AvroUtil.toGenericRecord(bestRecord.getFieldMap(), schema,
+        GenericRecord record = AvroUtil.toGenericRecord(bestRecord.getFieldMap(), schema,
             bigDecimalFormatString);
-        context.write(outKey, null);
+        write(context, record);
       }
     }
+
+  protected abstract void write(Context context, GenericRecord record) throws IOException, InterruptedException;
+
 }
\ No newline at end of file
index 35ab495..62334f8 100644 (file)
@@ -23,10 +23,7 @@ import org.apache.sqoop.lib.SqoopRecord;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.sqoop.avro.AvroUtil;
 
 import java.io.IOException;
@@ -35,9 +32,9 @@ import java.sql.SQLException;
 /**
  * Imports records by writing them to a Parquet File.
  */
-public class ParquetImportMapper
+public abstract class ParquetImportMapper<KEYOUT, VALOUT>
     extends AutoProgressMapper<LongWritable, SqoopRecord,
-        GenericRecord, NullWritable> {
+    KEYOUT, VALOUT> {
 
   private Schema schema = null;
   private boolean bigDecimalFormatString = true;
@@ -47,11 +44,11 @@ public class ParquetImportMapper
   protected void setup(Context context)
       throws IOException, InterruptedException {
     Configuration conf = context.getConfiguration();
-    schema = ParquetJob.getAvroSchema(conf);
+    schema = getAvroSchema(conf);
     bigDecimalFormatString = conf.getBoolean(
         ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
         ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
-    lobLoader = new LargeObjectLoader(conf, new Path(conf.get("sqoop.kite.lob.extern.dir", "/tmp/sqoop-parquet-" + context.getTaskAttemptID())));
+    lobLoader = createLobLoader(context);
   }
 
   @Override
@@ -64,9 +61,9 @@ public class ParquetImportMapper
       throw new IOException(sqlE);
     }
 
-    GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(), schema,
+    GenericRecord record = AvroUtil.toGenericRecord(val.getFieldMap(), schema,
         bigDecimalFormatString);
-    context.write(outKey, null);
+    write(context, record);
   }
 
   @Override
@@ -76,4 +73,9 @@ public class ParquetImportMapper
     }
   }
 
+  protected abstract LargeObjectLoader createLobLoader(Context context) throws IOException, InterruptedException;
+
+  protected abstract Schema getAvroSchema(Configuration configuration);
+
+  protected abstract void write(Context context, GenericRecord record) throws IOException, InterruptedException;
 }
index 7e975c7..8ef30d3 100644 (file)
@@ -30,6 +30,7 @@ import org.apache.sqoop.SqoopOptions;
 import org.apache.sqoop.manager.ImportJobContext;
 
 import org.apache.sqoop.mapreduce.DataDrivenImportJob;
+import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
 
 /**
  * Import data from a mainframe dataset, using MainframeDatasetInputFormat.
@@ -39,8 +40,8 @@ public class MainframeImportJob extends DataDrivenImportJob {
   private static final Log LOG = LogFactory.getLog(
       MainframeImportJob.class.getName());
 
-  public MainframeImportJob(final SqoopOptions opts, ImportJobContext context) {
-    super(opts, MainframeDatasetInputFormat.class, context);
+  public MainframeImportJob(final SqoopOptions opts, ImportJobContext context, ParquetImportJobConfigurator parquetImportJobConfigurator) {
+    super(opts, MainframeDatasetInputFormat.class, context, parquetImportJobConfigurator);
   }
 
   @Override
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetConstants.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetConstants.java
new file mode 100644 (file)
index 0000000..ae53a96
--- /dev/null
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.sqoop.mapreduce.parquet;
+
+public final class ParquetConstants {
+
+  public static final String SQOOP_PARQUET_AVRO_SCHEMA_KEY = "parquetjob.avro.schema";
+
+  public static final String SQOOP_PARQUET_OUTPUT_CODEC_KEY = "parquetjob.output.codec";
+
+  private ParquetConstants() {
+    throw new AssertionError("This class is meant for static use only.");
+  }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetExportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetExportJobConfigurator.java
new file mode 100644 (file)
index 0000000..8d7b87f
--- /dev/null
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.io.IOException;
+
+public interface ParquetExportJobConfigurator {
+
+  void configureInputFormat(Job job, Path inputPath) throws IOException;
+
+  Class<? extends Mapper> getMapperClass();
+
+  Class<? extends InputFormat> getInputFormatClass();
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java
new file mode 100644 (file)
index 0000000..fa1bc7d
--- /dev/null
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.SqoopOptions;
+
+import java.io.IOException;
+
+public interface ParquetImportJobConfigurator {
+
+  void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException;
+
+  Class<? extends Mapper> getMapperClass();
+
+  Class<? extends OutputFormat> getOutputFormatClass();
+
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactory.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactory.java
new file mode 100644 (file)
index 0000000..ed5103f
--- /dev/null
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet;
+
+public interface ParquetJobConfiguratorFactory {
+
+  ParquetImportJobConfigurator createParquetImportJobConfigurator();
+
+  ParquetExportJobConfigurator createParquetExportJobConfigurator();
+
+  ParquetMergeJobConfigurator createParquetMergeJobConfigurator();
+
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactoryProvider.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactoryProvider.java
new file mode 100644 (file)
index 0000000..2286a52
--- /dev/null
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetJobConfiguratorFactory;
+
+public final class ParquetJobConfiguratorFactoryProvider {
+
+  private ParquetJobConfiguratorFactoryProvider() {
+    throw new AssertionError("This class is meant for static use only.");
+  }
+
+  public static ParquetJobConfiguratorFactory createParquetJobConfiguratorFactory(Configuration configuration) {
+    return new KiteParquetJobConfiguratorFactory();
+  }
+
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetMergeJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetMergeJobConfigurator.java
new file mode 100644 (file)
index 0000000..67fdf66
--- /dev/null
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+
+public interface ParquetMergeJobConfigurator {
+
+  void configureParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath, Path finalPath) throws IOException;
+
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java
new file mode 100644 (file)
index 0000000..7f21205
--- /dev/null
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.sqoop.mapreduce.MergeParquetReducer;
+
+import java.io.IOException;
+
+public class KiteMergeParquetReducer extends MergeParquetReducer<GenericRecord, NullWritable> {
+
+  @Override
+  protected void write(Context context, GenericRecord record) throws IOException, InterruptedException {
+    context.write(record, null);
+  }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java
new file mode 100644 (file)
index 0000000..ca02c7b
--- /dev/null
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
+import org.apache.sqoop.util.FileSystemUtil;
+import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
+
+import java.io.IOException;
+
+public class KiteParquetExportJobConfigurator implements ParquetExportJobConfigurator {
+
+  @Override
+  public void configureInputFormat(Job job, Path inputPath) throws IOException {
+    String uri = "dataset:" + FileSystemUtil.makeQualified(inputPath, job.getConfiguration());
+    DatasetKeyInputFormat.configure(job).readFrom(uri);
+  }
+
+  @Override
+  public Class<? extends Mapper> getMapperClass() {
+    return KiteParquetExportMapper.class;
+  }
+
+  @Override
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return DatasetKeyInputFormat.class;
+  }
+}
  * limitations under the License.
  */
 
-package org.apache.sqoop.mapreduce;
+package org.apache.sqoop.mapreduce.parquet.kite;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.sqoop.mapreduce.GenericRecordExportMapper;
 
 import java.io.IOException;
 
 /**
  * Exports Parquet records from a data source.
  */
-public class ParquetExportMapper
-    extends GenericRecordExportMapper<GenericRecord, NullWritable> {
+public class KiteParquetExportMapper extends GenericRecordExportMapper<GenericRecord, NullWritable> {
 
   @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
-    super.setup(context);
-  }
-
-  @Override
-  protected void map(GenericRecord key, NullWritable val,
-      Context context) throws IOException, InterruptedException {
+  protected void map(GenericRecord key, NullWritable val, Context context) throws IOException, InterruptedException {
     context.write(toSqoopRecord(key), NullWritable.get());
   }
 
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java
new file mode 100644 (file)
index 0000000..87828d1
--- /dev/null
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.avro.Schema;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
+import org.apache.sqoop.util.FileSystemUtil;
+import org.kitesdk.data.Datasets;
+import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
+
+import java.io.IOException;
+
+public class KiteParquetImportJobConfigurator implements ParquetImportJobConfigurator {
+
+  public static final Log LOG = LogFactory.getLog(KiteParquetImportJobConfigurator.class.getName());
+
+  @Override
+  public void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException {
+    JobConf conf = (JobConf) job.getConfiguration();
+    String uri = getKiteUri(conf, options, tableName, destination);
+    KiteParquetUtils.WriteMode writeMode;
+
+    if (options.doHiveImport()) {
+      if (options.doOverwriteHiveTable()) {
+        writeMode = KiteParquetUtils.WriteMode.OVERWRITE;
+      } else {
+        writeMode = KiteParquetUtils.WriteMode.APPEND;
+        if (Datasets.exists(uri)) {
+          LOG.warn("Target Hive table '" + tableName + "' exists! Sqoop will " +
+              "append data into the existing Hive table. Consider using " +
+              "--hive-overwrite, if you do NOT intend to do appending.");
+        }
+      }
+    } else {
+      // Note that there is no such an import argument for overwriting HDFS
+      // dataset, so overwrite mode is not supported yet.
+      // Sqoop's append mode means to merge two independent datasets. We
+      // choose DEFAULT as write mode.
+      writeMode = KiteParquetUtils.WriteMode.DEFAULT;
+    }
+    KiteParquetUtils.configureImportJob(conf, schema, uri, writeMode);
+  }
+
+  @Override
+  public Class<? extends Mapper> getMapperClass() {
+    return KiteParquetImportMapper.class;
+  }
+
+  @Override
+  public Class<? extends OutputFormat> getOutputFormatClass() {
+    return DatasetKeyOutputFormat.class;
+  }
+
+  private String getKiteUri(Configuration conf, SqoopOptions options, String tableName, Path destination) throws IOException {
+    if (options.doHiveImport()) {
+      String hiveDatabase = options.getHiveDatabaseName() == null ? "default" :
+          options.getHiveDatabaseName();
+      String hiveTable = options.getHiveTableName() == null ? tableName :
+          options.getHiveTableName();
+      return String.format("dataset:hive:/%s/%s", hiveDatabase, hiveTable);
+    } else {
+      return "dataset:" + FileSystemUtil.makeQualified(destination, conf);
+    }
+  }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java
new file mode 100644 (file)
index 0000000..20adf6e
--- /dev/null
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.avro.AvroUtil;
+import org.apache.sqoop.lib.LargeObjectLoader;
+import org.apache.sqoop.mapreduce.ParquetImportMapper;
+
+import java.io.IOException;
+
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
+
+public class KiteParquetImportMapper extends ParquetImportMapper<GenericRecord, Void> {
+
+  @Override
+  protected LargeObjectLoader createLobLoader(Context context) throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    Path workPath = new Path(conf.get("sqoop.kite.lob.extern.dir", "/tmp/sqoop-parquet-" + context.getTaskAttemptID()));
+    return new LargeObjectLoader(conf, workPath);
+  }
+
+  @Override
+  protected Schema getAvroSchema(Configuration configuration) {
+    String schemaString = configuration.get(SQOOP_PARQUET_AVRO_SCHEMA_KEY);
+    return AvroUtil.parseAvroSchema(schemaString);
+  }
+
+  @Override
+  protected void write(Context context, GenericRecord record) throws IOException, InterruptedException {
+    context.write(record, null);
+  }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java
new file mode 100644 (file)
index 0000000..055e116
--- /dev/null
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
+import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
+import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
+import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
+
+public class KiteParquetJobConfiguratorFactory implements ParquetJobConfiguratorFactory {
+
+  @Override
+  public ParquetImportJobConfigurator createParquetImportJobConfigurator() {
+    return new KiteParquetImportJobConfigurator();
+  }
+
+  @Override
+  public ParquetExportJobConfigurator createParquetExportJobConfigurator() {
+    return new KiteParquetExportJobConfigurator();
+  }
+
+  @Override
+  public ParquetMergeJobConfigurator createParquetMergeJobConfigurator() {
+    return new KiteParquetMergeJobConfigurator();
+  }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java
new file mode 100644 (file)
index 0000000..9fecf28
--- /dev/null
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.sqoop.mapreduce.MergeParquetMapper;
+import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetDescriptor;
+import org.kitesdk.data.Datasets;
+import org.kitesdk.data.Formats;
+import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
+import parquet.avro.AvroParquetInputFormat;
+import parquet.avro.AvroSchemaConverter;
+import parquet.hadoop.Footer;
+import parquet.hadoop.ParquetFileReader;
+import parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
+
+public class KiteParquetMergeJobConfigurator implements ParquetMergeJobConfigurator {
+
+  public static final Log LOG = LogFactory.getLog(KiteParquetMergeJobConfigurator.class.getName());
+
+  @Override
+  public void configureParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath,
+                                       Path finalPath) throws IOException {
+    try {
+      FileSystem fileSystem = finalPath.getFileSystem(conf);
+      LOG.info("Trying to merge parquet files");
+      job.setOutputKeyClass(GenericRecord.class);
+      job.setMapperClass(MergeParquetMapper.class);
+      job.setReducerClass(KiteMergeParquetReducer.class);
+      job.setOutputValueClass(NullWritable.class);
+
+      List<Footer> footers = new ArrayList<Footer>();
+      FileStatus oldPathfileStatus = fileSystem.getFileStatus(oldPath);
+      FileStatus newPathfileStatus = fileSystem.getFileStatus(oldPath);
+      footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), oldPathfileStatus, true));
+      footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), newPathfileStatus, true));
+
+      MessageType schema = footers.get(0).getParquetMetadata().getFileMetaData().getSchema();
+      AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
+      Schema avroSchema = avroSchemaConverter.convert(schema);
+
+      if (!fileSystem.exists(finalPath)) {
+        Dataset dataset = createDataset(avroSchema, "dataset:" + finalPath);
+        DatasetKeyOutputFormat.configure(job).overwrite(dataset);
+      } else {
+        DatasetKeyOutputFormat.configure(job).overwrite(new URI("dataset:" + finalPath));
+      }
+
+      job.setInputFormatClass(AvroParquetInputFormat.class);
+      AvroParquetInputFormat.setAvroReadSchema(job, avroSchema);
+
+      conf.set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, avroSchema.toString());
+      Class<DatasetKeyOutputFormat> outClass = DatasetKeyOutputFormat.class;
+
+      job.setOutputFormatClass(outClass);
+    } catch (Exception cnfe) {
+      throw new IOException(cnfe);
+    }
+  }
+
+  public static Dataset createDataset(Schema schema, String uri) {
+    DatasetDescriptor descriptor =
+        new DatasetDescriptor.Builder().schema(schema).format(Formats.PARQUET).build();
+    return Datasets.create(uri, descriptor, GenericRecord.class);
+  }
+}
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.sqoop.mapreduce;
+package org.apache.sqoop.mapreduce.parquet.kite;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -41,12 +41,15 @@ import org.kitesdk.data.spi.SchemaValidationUtil;
 import java.io.IOException;
 import java.lang.reflect.Method;
 
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_OUTPUT_CODEC_KEY;
+
 /**
  * Helper class for setting up a Parquet MapReduce job.
  */
-public final class ParquetJob {
+public final class KiteParquetUtils {
 
-  public static final Log LOG = LogFactory.getLog(ParquetJob.class.getName());
+  public static final Log LOG = LogFactory.getLog(KiteParquetUtils.class.getName());
 
   public static final String HIVE_METASTORE_CLIENT_CLASS = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
 
@@ -66,22 +69,16 @@ public final class ParquetJob {
 
   private static final String HIVE_URI_PREFIX = "dataset:hive";
 
-  private ParquetJob() {
+  private KiteParquetUtils() {
   }
 
-  private static final String CONF_AVRO_SCHEMA = "parquetjob.avro.schema";
-  static final String CONF_OUTPUT_CODEC = "parquetjob.output.codec";
-  enum WriteMode {
+  public enum WriteMode {
     DEFAULT, APPEND, OVERWRITE
   };
 
-  public static Schema getAvroSchema(Configuration conf) {
-    return new Schema.Parser().parse(conf.get(CONF_AVRO_SCHEMA));
-  }
-
   public static CompressionType getCompressionType(Configuration conf) {
     CompressionType defaults = Formats.PARQUET.getDefaultCompressionType();
-    String codec = conf.get(CONF_OUTPUT_CODEC, defaults.getName());
+    String codec = conf.get(SQOOP_PARQUET_OUTPUT_CODEC_KEY, defaults.getName());
     try {
       return CompressionType.forName(codec);
     } catch (IllegalArgumentException ex) {
@@ -129,7 +126,7 @@ public final class ParquetJob {
     } else {
       dataset = createDataset(schema, getCompressionType(conf), uri);
     }
-    conf.set(CONF_AVRO_SCHEMA, schema.toString());
+    conf.set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, schema.toString());
 
     DatasetKeyOutputFormat.ConfigBuilder builder =
         DatasetKeyOutputFormat.configure(conf);
index e4b1350..ea2b064 100644 (file)
@@ -32,7 +32,7 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.sqoop.lib.DelimiterSet;
 import org.apache.sqoop.mapreduce.JdbcExportJob;
-
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
 
 
 /**
@@ -42,15 +42,16 @@ public class PostgreSQLCopyExportJob extends JdbcExportJob {
   public static final Log LOG =
     LogFactory.getLog(PostgreSQLCopyExportJob.class.getName());
 
-  public PostgreSQLCopyExportJob(final ExportJobContext context) {
-    super(context);
+  public PostgreSQLCopyExportJob(final ExportJobContext context, final ParquetExportJobConfigurator parquetExportJobConfigurator) {
+    super(context, parquetExportJobConfigurator);
   }
 
   public PostgreSQLCopyExportJob(final ExportJobContext ctxt,
       final Class<? extends Mapper> mapperClass,
       final Class<? extends InputFormat> inputFormatClass,
-      final Class<? extends OutputFormat> outputFormatClass) {
-    super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+      final Class<? extends OutputFormat> outputFormatClass,
+      final ParquetExportJobConfigurator parquetExportJobConfigurator) {
+    super(ctxt, mapperClass, inputFormatClass, outputFormatClass, parquetExportJobConfigurator);
   }
 
   @Override
index 783651a..c62ee98 100644 (file)
@@ -34,9 +34,12 @@ import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.OptionGroup;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.sqoop.manager.SupportedManagers;
 import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
+import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
+import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactoryProvider;
 import org.apache.sqoop.util.CredentialsUtil;
 import org.apache.sqoop.util.LoggingUtils;
 import org.apache.sqoop.util.password.CredentialProviderHelper;
@@ -1904,4 +1907,8 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool {
     }
 
   }
+
+  public ParquetJobConfiguratorFactory getParquetJobConfigurator(Configuration configuration) {
+    return ParquetJobConfiguratorFactoryProvider.createParquetJobConfiguratorFactory(configuration);
+  }
 }
index ee79d8b..2c474b7 100644 (file)
@@ -46,6 +46,7 @@ import org.apache.sqoop.hive.HiveClient;
 import org.apache.sqoop.hive.HiveClientFactory;
 import org.apache.sqoop.manager.ImportJobContext;
 import org.apache.sqoop.mapreduce.MergeJob;
+import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
 import org.apache.sqoop.metastore.JobData;
 import org.apache.sqoop.metastore.JobStorage;
 import org.apache.sqoop.metastore.JobStorageFactory;
@@ -472,7 +473,8 @@ public class ImportTool extends BaseSqoopTool {
           loadJars(options.getConf(), context.getJarFile(), context.getTableName());
         }
 
-        MergeJob mergeJob = new MergeJob(options);
+        ParquetMergeJobConfigurator parquetMergeJobConfigurator = getParquetJobConfigurator(options.getConf()).createParquetMergeJobConfigurator();
+        MergeJob mergeJob = new MergeJob(options, parquetMergeJobConfigurator);
         if (mergeJob.runMergeJob()) {
           // Rename destination directory to proper location.
           Path tmpDir = getOutputPath(options, context.getTableName());
index 311fee8..4c20f7d 100644 (file)
@@ -30,6 +30,7 @@ import org.apache.sqoop.SqoopOptions.InvalidOptionsException;
 import org.apache.sqoop.cli.RelatedOptions;
 import org.apache.sqoop.cli.ToolOptions;
 import org.apache.sqoop.mapreduce.MergeJob;
+import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
 import org.apache.sqoop.util.LoggingUtils;
 
 /**
@@ -52,7 +53,8 @@ public class MergeTool extends BaseSqoopTool {
   public int run(SqoopOptions options) {
     try {
       // Configure and execute a MapReduce job to merge these datasets.
-      MergeJob mergeJob = new MergeJob(options);
+      ParquetMergeJobConfigurator parquetMergeJobConfigurator = getParquetJobConfigurator(options.getConf()).createParquetMergeJobConfigurator();
+      MergeJob mergeJob = new MergeJob(options, parquetMergeJobConfigurator);
       if (!mergeJob.runMergeJob()) {
         LOG.error("MapReduce job failed!");
         return 1;
index 0f9c7f3..27d407a 100644 (file)
@@ -32,10 +32,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sqoop.util.ParquetReader;
 import org.junit.Test;
+import parquet.avro.AvroSchemaConverter;
 import parquet.format.CompressionCodec;
 import parquet.hadoop.Footer;
 import parquet.hadoop.ParquetFileReader;
 import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.MessageType;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -311,8 +313,9 @@ public class TestParquetImport extends ImportJobTestCase {
   }
 
   private Schema getSchema() {
-    String schemaString = getOutputMetadata().getFileMetaData().getKeyValueMetaData().get("parquet.avro.schema");
-    return new Schema.Parser().parse(schemaString);
+    MessageType parquetSchema = getOutputMetadata().getFileMetaData().getSchema();
+    AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
+    return avroSchemaConverter.convert(parquetSchema);
   }
 
   private void checkField(Field field, String name, Type type) {
index 77674db..436f0e5 100644 (file)
@@ -35,7 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.sqoop.avro.AvroSchemaMismatchException;
-import org.apache.sqoop.mapreduce.ParquetJob;
+import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetUtils;
 import org.apache.sqoop.util.ParquetReader;
 import org.junit.After;
 import org.junit.Before;
@@ -404,7 +404,7 @@ public class TestHiveImport extends ImportJobTestCase {
     createTableWithColTypes(types, vals);
 
     thrown.expect(AvroSchemaMismatchException.class);
-    thrown.expectMessage(ParquetJob.INCOMPATIBLE_AVRO_SCHEMA_MSG + ParquetJob.HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG);
+    thrown.expectMessage(KiteParquetUtils.INCOMPATIBLE_AVRO_SCHEMA_MSG + KiteParquetUtils.HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG);
 
     SqoopOptions sqoopOptions = getSqoopOptions(getConf());
     sqoopOptions.setThrowOnError(true);
@@ -422,7 +422,7 @@ public class TestHiveImport extends ImportJobTestCase {
             .name(getColName(2)).type().nullable().stringType().noDefault()
             .endRecord();
     String dataSetUri = "dataset:hive:/default/" + tableName;
-    ParquetJob.createDataset(dataSetSchema, ParquetJob.getCompressionType(new Configuration()), dataSetUri);
+    KiteParquetUtils.createDataset(dataSetSchema, KiteParquetUtils.getCompressionType(new Configuration()), dataSetUri);
   }
 
   /**
index a900b1c..81ab677 100644 (file)
@@ -36,6 +36,7 @@ import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.sqoop.mapreduce.ExportJobBase.FileType;
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
 import org.junit.Test;
 
 import org.apache.sqoop.SqoopOptions;
@@ -117,7 +118,7 @@ public class TestJdbcExportJob {
     when(mockConnManager.getColumnTypes(anyString(), anyString())).thenReturn(columnTypeInts);
     when(mockConnManager.toJavaType(anyString(), anyString(), anyInt())).thenReturn("String");
     when(mockContext.getConnManager()).thenReturn(mockConnManager);
-    JdbcExportJob jdbcExportJob = new JdbcExportJob(mockContext) {
+    JdbcExportJob jdbcExportJob = new JdbcExportJob(mockContext, mock(ParquetExportJobConfigurator.class)) {
       @Override
       protected FileType getInputFileType() {
         return inputFileType;
index a133e58..be62efd 100644 (file)
@@ -19,6 +19,7 @@
 package org.apache.sqoop.mapreduce.mainframe;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -27,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -56,7 +58,7 @@ public class TestMainframeImportJob {
     Path path = new Path("dummyPath");
     ImportJobContext context = new ImportJobContext(tableName, jarFile,
         options, path);
-    mfImportJob = new MainframeImportJob(options, context);
+    mfImportJob = new MainframeImportJob(options, context, mock(ParquetImportJobConfigurator.class));
 
     // To access protected method by means of reflection
     Class[] types = {};
@@ -79,7 +81,7 @@ public class TestMainframeImportJob {
     options.setFileLayout(SqoopOptions.FileLayout.AvroDataFile);
     ImportJobContext context = new ImportJobContext(tableName, jarFile,
         options, path);
-    avroImportJob = new MainframeImportJob(options, context);
+    avroImportJob = new MainframeImportJob(options, context, mock(ParquetImportJobConfigurator.class));
 
     // To access protected method by means of reflection
     Class[] types = {};