SQOOP-3329: Remove Kite dependency from the Sqoop project
authorBoglarka Egyed <bogi@apache.org>
Fri, 20 Jul 2018 07:36:39 +0000 (09:36 +0200)
committerBoglarka Egyed <bogi@apache.org>
Fri, 20 Jul 2018 07:36:39 +0000 (09:36 +0200)
(Szabolcs Vasas via Boglarka Egyed)

20 files changed:
ivy.xml
ivy/libraries.properties
src/docs/user/hive-notes.txt
src/docs/user/import.txt
src/java/org/apache/sqoop/SqoopOptions.java
src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java [deleted file]
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java [deleted file]
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java [deleted file]
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java [deleted file]
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java [deleted file]
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java [deleted file]
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java [deleted file]
src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java [deleted file]
src/java/org/apache/sqoop/tool/BaseSqoopTool.java
src/test/org/apache/sqoop/TestMerge.java
src/test/org/apache/sqoop/TestParquetExport.java
src/test/org/apache/sqoop/TestParquetImport.java
src/test/org/apache/sqoop/hive/TestHiveImport.java
src/test/org/apache/sqoop/tool/TestBaseSqoopTool.java

diff --git a/ivy.xml b/ivy.xml
index 1f587f3..796ef70 100644 (file)
--- a/ivy.xml
+++ b/ivy.xml
@@ -114,15 +114,7 @@ under the License.
       conf="common->default;redist->default"/>
     <dependency org="org.apache.commons" name="commons-lang3" rev="${commons-lang3.version}"
       conf="common->default;redist->default"/>
-    <dependency org="org.kitesdk" name="kite-data-mapreduce" rev="${kite-data.version}"
-      conf="common->default;redist->default">
-      <exclude org="org.apache.avro" module="avro" />
-      </dependency>
-    <dependency org="org.kitesdk" name="kite-data-hive" rev="${kite-data.version}"
-      conf="common->default;redist->default">
-      <exclude org="com.twitter" module="parquet-hive-bundle"/>
-      <exclude org="org.apache.avro" module="avro" />
-    </dependency>
+    <dependency org="com.twitter" name="parquet-avro" rev="${parquet.version}" conf="common->default;redist->default"/>
 
     <dependency org="com.fasterxml.jackson.core" name="jackson-databind" rev="${jackson-databind.version}"
           conf="common->default;redist->default" />
index 565a8bf..c506ca8 100644 (file)
@@ -20,8 +20,6 @@
 
 avro.version=1.8.1
 
-kite-data.version=1.1.0
-
 checkstyle.version=5.0
 
 commons-cli.version=1.2
@@ -62,3 +60,4 @@ hbase.version=1.2.4
 hcatalog.version=1.2.1
 
 jackson-databind.version=2.9.5
+parquet.version=1.6.0
index af97d94..d58c4d6 100644 (file)
@@ -28,11 +28,3 @@ direct mapping (for example, +DATE+, +TIME+, and +TIMESTAMP+) will be coerced to
 +STRING+ in Hive. The +NUMERIC+ and +DECIMAL+ SQL types will be coerced to
 +DOUBLE+. In these cases, Sqoop will emit a warning in its log messages
 informing you of the loss of precision.
-
-Parquet Support in Hive
-~~~~~~~~~~~~~~~~~~~~~~~
-
-When using the Kite Dataset API based Parquet implementation in order to contact the Hive MetaStore
-from a MapReduce job, a delegation token will be fetched and passed. HIVE_CONF_DIR and HIVE_HOME must be set appropriately to add
-Hive to the runtime classpath. Otherwise, importing/exporting into Hive in Parquet
-format may not work.
index a2c16d9..79f7101 100644 (file)
@@ -60,7 +60,7 @@ Argument                                    Description
 +\--as-sequencefile+                        Imports data to SequenceFiles
 +\--as-textfile+                            Imports data as plain text (default)
 +\--as-parquetfile+                         Imports data to Parquet Files
-+\--parquet-configurator-implementation+    Sets the implementation used during Parquet import. Supported values: kite, hadoop.
++\--parquet-configurator-implementation+    Sets the implementation used during Parquet import. Supported value: hadoop.
 +\--boundary-query <statement>+             Boundary query to use for creating splits
 +\--columns <col,col,col...>+               Columns to import from table
 +\--delete-target-dir+                      Delete the import target directory\
@@ -448,35 +448,14 @@ and Avro files.
 Parquet support
 +++++++++++++++
 
-Sqoop has two different implementations for importing data in Parquet format:
+Sqoop has only one implementation now for importing data in Parquet format which is based on the Parquet Hadoop API.
+Note that the legacy Kite Dataset API based implementation is removed so users have to make sure that both
++\--parquet-configurator-implementation+ option and +parquetjob.configurator.implementation+ property are unset or
+set to "hadoop".
 
-- Kite Dataset API based implementation (default, legacy)
-- Parquet Hadoop API based implementation (recommended)
-
-The users can specify the desired implementation with the +\--parquet-configurator-implementation+ option:
-
-----
-$ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --as-parquetfile --parquet-configurator-implementation kite
-----
-
-----
-$ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --as-parquetfile --parquet-configurator-implementation hadoop
-----
-
-If the +\--parquet-configurator-implementation+ option is not present then Sqoop will check the value of +parquetjob.configurator.implementation+
-property (which can be specified using -D in the Sqoop command or in the site.xml). If that value is also absent Sqoop will
-default to Kite Dataset API based implementation.
-
-The Kite Dataset API based implementation executes the import command on a different code
-path than the text import: it creates the Hive table based on the generated Avro schema by connecting to the Hive metastore.
-This can be a disadvantage since sometimes moving from the text file format to the Parquet file format can lead to many
-unexpected behavioral changes. Kite checks the Hive table schema before importing the data into it so if the user wants
-to import some data which has a schema incompatible with the Hive table's schema Sqoop will throw an error. This implementation
-uses snappy codec for compression by default and apart from this it supports the bzip codec too.
-
-The Parquet Hadoop API based implementation builds the Hive CREATE TABLE statement and executes the
-LOAD DATA INPATH command just like the text import does. Unlike Kite it also supports connecting to HiveServer2 (using the +\--hs2-url+ option)
-so it provides better security features. This implementation does not check the Hive table's schema before importing so
+The default Parquet import implementation builds the Hive CREATE TABLE statement and executes the
+LOAD DATA INPATH command just like the text import does. It supports connecting to HiveServer2 (using the +\--hs2-url+ option)
+but it does not check the Hive table's schema before importing so
 it is possible that the user can successfully import data into Hive but they get an error during a Hive read operation later.
 It does not use any compression by default but supports snappy and bzip codecs.
 
@@ -487,6 +466,8 @@ $ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --as-par
 --parquet-configurator-implementation hadoop --hs2-url "jdbc:hive2://hs2.foo.com:10000" --hs2-keytab "/path/to/keytab"
 ----
 
+Note that +\--parquet-configurator-implementation hadoop+ is now optional.
+
 Enabling Logical Types in Avro and Parquet import for numbers
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
index cc1b752..f97dbfd 100644 (file)
@@ -53,7 +53,7 @@ import org.apache.sqoop.util.RandomHash;
 import org.apache.sqoop.util.StoredAsProperty;
 
 import static org.apache.sqoop.Sqoop.SQOOP_RETHROW_PROPERTY;
-import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
+import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.HADOOP;
 import static org.apache.sqoop.orm.ClassWriter.toJavaIdentifier;
 
 /**
@@ -1161,7 +1161,7 @@ public class SqoopOptions implements Cloneable {
     // set escape column mapping to true
     this.escapeColumnMappingEnabled = true;
 
-    this.parquetConfiguratorImplementation = KITE;
+    this.parquetConfiguratorImplementation = HADOOP;
   }
 
   /**
index 050c854..c6b576d 100644 (file)
 package org.apache.sqoop.mapreduce.parquet;
 
 import org.apache.sqoop.mapreduce.parquet.hadoop.HadoopParquetJobConfiguratorFactory;
-import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetJobConfiguratorFactory;
 
 /**
  * An enum containing all the implementations available for {@link ParquetJobConfiguratorFactory}.
  * The enumeration constants are also used to instantiate concrete {@link ParquetJobConfiguratorFactory} objects.
  */
 public enum ParquetJobConfiguratorImplementation {
-  KITE(KiteParquetJobConfiguratorFactory.class), HADOOP(HadoopParquetJobConfiguratorFactory.class);
+  HADOOP(HadoopParquetJobConfiguratorFactory.class);
 
   private Class<? extends ParquetJobConfiguratorFactory> configuratorFactoryClass;
 
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java
deleted file mode 100644 (file)
index 02816d7..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce.parquet.kite;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.sqoop.mapreduce.MergeParquetReducer;
-
-import java.io.IOException;
-
-/**
- * An implementation of {@link MergeParquetReducer} which depends on the Kite Dataset API.
- */
-public class KiteMergeParquetReducer extends MergeParquetReducer<GenericRecord, NullWritable> {
-
-  @Override
-  protected void write(Context context, GenericRecord record) throws IOException, InterruptedException {
-    context.write(record, null);
-  }
-}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java
deleted file mode 100644 (file)
index 6ebc5a3..0000000
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce.parquet.kite;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
-import org.apache.sqoop.util.FileSystemUtil;
-import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
-
-import java.io.IOException;
-
-/**
- * An implementation of {@link ParquetExportJobConfigurator} which depends on the Kite Dataset API.
- */
-public class KiteParquetExportJobConfigurator implements ParquetExportJobConfigurator {
-
-  @Override
-  public void configureInputFormat(Job job, Path inputPath) throws IOException {
-    String uri = "dataset:" + FileSystemUtil.makeQualified(inputPath, job.getConfiguration());
-    DatasetKeyInputFormat.configure(job).readFrom(uri);
-  }
-
-  @Override
-  public Class<? extends Mapper> getMapperClass() {
-    return KiteParquetExportMapper.class;
-  }
-
-  @Override
-  public Class<? extends InputFormat> getInputFormatClass() {
-    return DatasetKeyInputFormat.class;
-  }
-}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java
deleted file mode 100644 (file)
index 122ff3f..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce.parquet.kite;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.sqoop.mapreduce.GenericRecordExportMapper;
-
-import java.io.IOException;
-
-/**
- * An implementation of {@link GenericRecordExportMapper} which depends on the Kite Dataset API.
- */
-public class KiteParquetExportMapper extends GenericRecordExportMapper<GenericRecord, NullWritable> {
-
-  @Override
-  protected void map(GenericRecord key, NullWritable val, Context context) throws IOException, InterruptedException {
-    context.write(toSqoopRecord(key), NullWritable.get());
-  }
-
-}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java
deleted file mode 100644 (file)
index 7e179a2..0000000
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce.parquet.kite;
-
-import org.apache.avro.Schema;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.sqoop.SqoopOptions;
-import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
-import org.apache.sqoop.util.FileSystemUtil;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
-
-import java.io.IOException;
-
-/**
- * An implementation of {@link ParquetImportJobConfigurator} which depends on the Kite Dataset API.
- */
-public class KiteParquetImportJobConfigurator implements ParquetImportJobConfigurator {
-
-  public static final Log LOG = LogFactory.getLog(KiteParquetImportJobConfigurator.class.getName());
-
-  @Override
-  public void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException {
-    JobConf conf = (JobConf) job.getConfiguration();
-    String uri = getKiteUri(conf, options, tableName, destination);
-    KiteParquetUtils.WriteMode writeMode;
-
-    if (options.doHiveImport()) {
-      if (options.doOverwriteHiveTable()) {
-        writeMode = KiteParquetUtils.WriteMode.OVERWRITE;
-      } else {
-        writeMode = KiteParquetUtils.WriteMode.APPEND;
-        if (Datasets.exists(uri)) {
-          LOG.warn("Target Hive table '" + tableName + "' exists! Sqoop will " +
-              "append data into the existing Hive table. Consider using " +
-              "--hive-overwrite, if you do NOT intend to do appending.");
-        }
-      }
-    } else {
-      // Note that there is no such an import argument for overwriting HDFS
-      // dataset, so overwrite mode is not supported yet.
-      // Sqoop's append mode means to merge two independent datasets. We
-      // choose DEFAULT as write mode.
-      writeMode = KiteParquetUtils.WriteMode.DEFAULT;
-    }
-    KiteParquetUtils.configureImportJob(conf, schema, uri, writeMode);
-  }
-
-  @Override
-  public Class<? extends Mapper> getMapperClass() {
-    return KiteParquetImportMapper.class;
-  }
-
-  @Override
-  public Class<? extends OutputFormat> getOutputFormatClass() {
-    return DatasetKeyOutputFormat.class;
-  }
-
-  @Override
-  public boolean isHiveImportNeeded() {
-    return false;
-  }
-
-  private String getKiteUri(Configuration conf, SqoopOptions options, String tableName, Path destination) throws IOException {
-    if (options.doHiveImport()) {
-      String hiveDatabase = options.getHiveDatabaseName() == null ? "default" :
-          options.getHiveDatabaseName();
-      String hiveTable = options.getHiveTableName() == null ? tableName :
-          options.getHiveTableName();
-      return String.format("dataset:hive:/%s/%s", hiveDatabase, hiveTable);
-    } else {
-      return "dataset:" + FileSystemUtil.makeQualified(destination, conf);
-    }
-  }
-}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java
deleted file mode 100644 (file)
index 0a91e4a..0000000
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce.parquet.kite;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.sqoop.avro.AvroUtil;
-import org.apache.sqoop.lib.LargeObjectLoader;
-import org.apache.sqoop.mapreduce.ParquetImportMapper;
-
-import java.io.IOException;
-
-import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
-
-/**
- * An implementation of {@link ParquetImportMapper} which depends on the Kite Dataset API.
- */
-public class KiteParquetImportMapper extends ParquetImportMapper<GenericRecord, Void> {
-
-  @Override
-  protected LargeObjectLoader createLobLoader(Context context) throws IOException, InterruptedException {
-    Configuration conf = context.getConfiguration();
-    Path workPath = new Path(conf.get("sqoop.kite.lob.extern.dir", "/tmp/sqoop-parquet-" + context.getTaskAttemptID()));
-    return new LargeObjectLoader(conf, workPath);
-  }
-
-  @Override
-  protected Schema getAvroSchema(Configuration configuration) {
-    String schemaString = configuration.get(SQOOP_PARQUET_AVRO_SCHEMA_KEY);
-    return AvroUtil.parseAvroSchema(schemaString);
-  }
-
-  @Override
-  protected void write(Context context, GenericRecord record) throws IOException, InterruptedException {
-    context.write(record, null);
-  }
-}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java
deleted file mode 100644 (file)
index bd07c09..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce.parquet.kite;
-
-import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
-import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
-import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
-import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
-
-/**
- * A concrete factory implementation which produces configurator objects using the Kite Dataset API.
- */
-public class KiteParquetJobConfiguratorFactory implements ParquetJobConfiguratorFactory {
-
-  @Override
-  public ParquetImportJobConfigurator createParquetImportJobConfigurator() {
-    return new KiteParquetImportJobConfigurator();
-  }
-
-  @Override
-  public ParquetExportJobConfigurator createParquetExportJobConfigurator() {
-    return new KiteParquetExportJobConfigurator();
-  }
-
-  @Override
-  public ParquetMergeJobConfigurator createParquetMergeJobConfigurator() {
-    return new KiteParquetMergeJobConfigurator();
-  }
-}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java
deleted file mode 100644 (file)
index ed045cd..0000000
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce.parquet.kite;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.sqoop.mapreduce.MergeParquetMapper;
-import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.Formats;
-import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
-import parquet.avro.AvroParquetInputFormat;
-import parquet.avro.AvroSchemaConverter;
-import parquet.hadoop.Footer;
-import parquet.hadoop.ParquetFileReader;
-import parquet.schema.MessageType;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
-
-/**
- * An implementation of {@link ParquetMergeJobConfigurator} which depends on the Kite Dataset API.
- */
-public class KiteParquetMergeJobConfigurator implements ParquetMergeJobConfigurator {
-
-  public static final Log LOG = LogFactory.getLog(KiteParquetMergeJobConfigurator.class.getName());
-
-  @Override
-  public void configureParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath,
-                                       Path finalPath) throws IOException {
-    try {
-      FileSystem fileSystem = finalPath.getFileSystem(conf);
-      LOG.info("Trying to merge parquet files");
-      job.setOutputKeyClass(GenericRecord.class);
-      job.setMapperClass(MergeParquetMapper.class);
-      job.setReducerClass(KiteMergeParquetReducer.class);
-      job.setOutputValueClass(NullWritable.class);
-
-      List<Footer> footers = new ArrayList<Footer>();
-      FileStatus oldPathfileStatus = fileSystem.getFileStatus(oldPath);
-      FileStatus newPathfileStatus = fileSystem.getFileStatus(oldPath);
-      footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), oldPathfileStatus, true));
-      footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), newPathfileStatus, true));
-
-      MessageType schema = footers.get(0).getParquetMetadata().getFileMetaData().getSchema();
-      AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
-      Schema avroSchema = avroSchemaConverter.convert(schema);
-
-      if (!fileSystem.exists(finalPath)) {
-        Dataset dataset = createDataset(avroSchema, "dataset:" + finalPath);
-        DatasetKeyOutputFormat.configure(job).overwrite(dataset);
-      } else {
-        DatasetKeyOutputFormat.configure(job).overwrite(new URI("dataset:" + finalPath));
-      }
-
-      job.setInputFormatClass(AvroParquetInputFormat.class);
-      AvroParquetInputFormat.setAvroReadSchema(job, avroSchema);
-
-      conf.set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, avroSchema.toString());
-      Class<DatasetKeyOutputFormat> outClass = DatasetKeyOutputFormat.class;
-
-      job.setOutputFormatClass(outClass);
-    } catch (Exception cnfe) {
-      throw new IOException(cnfe);
-    }
-  }
-
-  public static Dataset createDataset(Schema schema, String uri) {
-    DatasetDescriptor descriptor =
-        new DatasetDescriptor.Builder().schema(schema).format(Formats.PARQUET).build();
-    return Datasets.create(uri, descriptor, GenericRecord.class);
-  }
-}
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java
deleted file mode 100644 (file)
index a4768c9..0000000
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce.parquet.kite;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.sqoop.avro.AvroSchemaMismatchException;
-import org.apache.sqoop.hive.HiveConfig;
-import org.kitesdk.data.CompressionType;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.Formats;
-import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
-import org.kitesdk.data.spi.SchemaValidationUtil;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-
-import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
-import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_OUTPUT_CODEC_KEY;
-
-/**
- * Helper class using the Kite Dataset API for setting up a Parquet MapReduce job.
- */
-public final class KiteParquetUtils {
-
-  public static final Log LOG = LogFactory.getLog(KiteParquetUtils.class.getName());
-
-  public static final String HIVE_METASTORE_CLIENT_CLASS = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
-
-  public static final String HIVE_METASTORE_SASL_ENABLED = "hive.metastore.sasl.enabled";
-  // Purposefully choosing the same token alias as the one Oozie chooses.
-  // Make sure we don't generate a new delegation token if oozie
-  // has already generated one.
-  public static final String HIVE_METASTORE_TOKEN_ALIAS = "HCat Token";
-
-  public static final String INCOMPATIBLE_AVRO_SCHEMA_MSG = "Target dataset was created with an incompatible Avro schema. ";
-
-  public static final String HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG = "You tried to import to an already existing Hive table in " +
-      "Parquet format. Sqoop maps date/timestamp SQL types to int/bigint Hive types during Hive Parquet import" +
-      " but it is possible that date/timestamp types were mapped to strings during table" +
-      " creation. Consider using Sqoop option --map-column-java resolve the mismatch" +
-      " (e.g. --map-column-java date_field1=String,timestamp_field1=String).";
-
-  private static final String HIVE_URI_PREFIX = "dataset:hive";
-
-  private KiteParquetUtils() {
-  }
-
-  public enum WriteMode {
-    DEFAULT, APPEND, OVERWRITE
-  };
-
-  public static CompressionType getCompressionType(Configuration conf) {
-    CompressionType defaults = Formats.PARQUET.getDefaultCompressionType();
-    String codec = conf.get(SQOOP_PARQUET_OUTPUT_CODEC_KEY, defaults.getName());
-    try {
-      return CompressionType.forName(codec);
-    } catch (IllegalArgumentException ex) {
-      LOG.warn(String.format(
-          "Unsupported compression type '%s'. Fallback to '%s'.",
-          codec, defaults));
-    }
-    return defaults;
-  }
-
-  /**
-   * Configure the import job. The import process will use a Kite dataset to
-   * write data records into Parquet format internally. The input key class is
-   * {@link org.apache.sqoop.lib.SqoopRecord}. The output key is
-   * {@link org.apache.avro.generic.GenericRecord}.
-   */
-  public static void configureImportJob(JobConf conf, Schema schema,
-      String uri, WriteMode writeMode) throws IOException {
-    Dataset dataset;
-
-    // Add hive delegation token only if we don't already have one.
-    if (isHiveImport(uri)) {
-      Configuration hiveConf = HiveConfig.getHiveConf(conf);
-      if (isSecureMetastore(hiveConf)) {
-        // Copy hive configs to job config
-        HiveConfig.addHiveConfigs(hiveConf, conf);
-
-        if (conf.getCredentials().getToken(new Text(HIVE_METASTORE_TOKEN_ALIAS)) == null) {
-          addHiveDelegationToken(conf);
-        }
-      }
-    }
-
-    if (Datasets.exists(uri)) {
-      if (WriteMode.DEFAULT.equals(writeMode)) {
-        throw new IOException("Destination exists! " + uri);
-      }
-
-      dataset = Datasets.load(uri);
-      Schema writtenWith = dataset.getDescriptor().getSchema();
-      if (!SchemaValidationUtil.canRead(writtenWith, schema)) {
-        String exceptionMessage = buildAvroSchemaMismatchMessage(isHiveImport(uri));
-        throw new AvroSchemaMismatchException(exceptionMessage, writtenWith, schema);
-      }
-    } else {
-      dataset = createDataset(schema, getCompressionType(conf), uri);
-    }
-    conf.set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, schema.toString());
-
-    DatasetKeyOutputFormat.ConfigBuilder builder =
-        DatasetKeyOutputFormat.configure(conf);
-    if (WriteMode.OVERWRITE.equals(writeMode)) {
-      builder.overwrite(dataset);
-    } else if (WriteMode.APPEND.equals(writeMode)) {
-      builder.appendTo(dataset);
-    } else {
-      builder.writeTo(dataset);
-    }
-  }
-
-  private static boolean isHiveImport(String importUri) {
-    return importUri.startsWith(HIVE_URI_PREFIX);
-  }
-
-  public static Dataset createDataset(Schema schema,
-      CompressionType compressionType, String uri) {
-    DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
-        .schema(schema)
-        .format(Formats.PARQUET)
-        .compressionType(compressionType)
-        .build();
-    return Datasets.create(uri, descriptor, GenericRecord.class);
-  }
-
-  private static boolean isSecureMetastore(Configuration conf) {
-    return conf != null && conf.getBoolean(HIVE_METASTORE_SASL_ENABLED, false);
-  }
-
-  /**
-   * Add hive delegation token to credentials store.
-   * @param conf
-   */
-  private static void addHiveDelegationToken(JobConf conf) {
-    // Need to use reflection since there's no compile time dependency on the client libs.
-    Class<?> HiveConfClass;
-    Class<?> HiveMetaStoreClientClass;
-
-    try {
-      HiveMetaStoreClientClass = Class.forName(HIVE_METASTORE_CLIENT_CLASS);
-    } catch (ClassNotFoundException ex) {
-      LOG.error("Could not load " + HIVE_METASTORE_CLIENT_CLASS
-          + " when adding hive delegation token. "
-          + "Make sure HIVE_CONF_DIR is set correctly.", ex);
-      throw new RuntimeException("Couldn't fetch delegation token.", ex);
-    }
-
-    try {
-      HiveConfClass = Class.forName(HiveConfig.HIVE_CONF_CLASS);
-    } catch (ClassNotFoundException ex) {
-      LOG.error("Could not load " + HiveConfig.HIVE_CONF_CLASS
-          + " when adding hive delegation token."
-          + " Make sure HIVE_CONF_DIR is set correctly.", ex);
-      throw new RuntimeException("Couldn't fetch delegation token.", ex);
-    }
-
-    try {
-      Object client = HiveMetaStoreClientClass.getConstructor(HiveConfClass).newInstance(
-          HiveConfClass.getConstructor(Configuration.class, Class.class).newInstance(conf, Configuration.class)
-      );
-      // getDelegationToken(String kerberosPrincial)
-      Method getDelegationTokenMethod = HiveMetaStoreClientClass.getMethod("getDelegationToken", String.class);
-      Object tokenStringForm = getDelegationTokenMethod.invoke(client, UserGroupInformation.getLoginUser().getShortUserName());
-
-      // Load token
-      Token<DelegationTokenIdentifier> metastoreToken = new Token<DelegationTokenIdentifier>();
-      metastoreToken.decodeFromUrlString(tokenStringForm.toString());
-      conf.getCredentials().addToken(new Text(HIVE_METASTORE_TOKEN_ALIAS), metastoreToken);
-
-      LOG.debug("Successfully fetched hive metastore delegation token. " + metastoreToken);
-    } catch (Exception ex) {
-      LOG.error("Couldn't fetch delegation token.", ex);
-      throw new RuntimeException("Couldn't fetch delegation token.", ex);
-    }
-  }
-
-  private static String buildAvroSchemaMismatchMessage(boolean hiveImport) {
-    String exceptionMessage = INCOMPATIBLE_AVRO_SCHEMA_MSG;
-
-    if (hiveImport) {
-      exceptionMessage += HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG;
-    }
-
-    return exceptionMessage;
-  }
-
-}
index 87fc5e9..9dcbdd5 100644 (file)
@@ -21,7 +21,6 @@ package org.apache.sqoop.tool;
 import static java.lang.String.format;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.PARQUET_JOB_CONFIGURATOR_IMPLEMENTATION_KEY;
-import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
 import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.valueOf;
 
 import java.io.File;
@@ -1587,15 +1586,6 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool {
         + "importing into SequenceFile format.");
     }
 
-    // Hive import and create hive table not compatible for ParquetFile format when using Kite
-    if (options.doHiveImport()
-        && options.doFailIfHiveTableExists()
-        && options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile
-        && options.getParquetConfiguratorImplementation() == KITE) {
-      throw new InvalidOptionsException("Hive import and create hive table is not compatible with "
-        + "importing into ParquetFile format using Kite.");
-      }
-
     if (options.doHiveImport()
         && options.getIncrementalMode().equals(IncrementalMode.DateLastModified)) {
       throw new InvalidOptionsException(HIVE_IMPORT_WITH_LASTMODIFIED_NOT_SUPPORTED);
index 2b3280a..b283174 100644 (file)
@@ -27,7 +27,6 @@ import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation;
 import org.apache.sqoop.testutil.CommonArgs;
 import org.apache.sqoop.testutil.HsqldbTestServer;
 import org.apache.sqoop.manager.ConnManager;
@@ -54,8 +53,6 @@ import org.apache.sqoop.util.ParquetReader;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.HADOOP;
-import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
 import static org.junit.Assert.fail;
 
 /**
@@ -84,8 +81,6 @@ public class TestMerge extends BaseSqoopTestCase {
       Arrays.asList(new Integer(1), new Integer(43)),
       Arrays.asList(new Integer(3), new Integer(313)));
 
-  private ParquetJobConfiguratorImplementation parquetJobConfiguratorImplementation = KITE;
-
   @Before
   public void setUp() {
     super.setUp();
@@ -118,7 +113,6 @@ public class TestMerge extends BaseSqoopTestCase {
   public SqoopOptions getSqoopOptions(Configuration conf) {
     SqoopOptions options = new SqoopOptions(conf);
     options.setConnectString(HsqldbTestServer.getDbUrl());
-    options.setParquetConfiguratorImplementation(parquetJobConfiguratorImplementation);
 
     return options;
   }
@@ -164,14 +158,7 @@ public class TestMerge extends BaseSqoopTestCase {
   }
 
   @Test
-  public void testParquetFileMergeHadoop() throws Exception {
-    parquetJobConfiguratorImplementation = HADOOP;
-    runMergeTest(SqoopOptions.FileLayout.ParquetFile);
-  }
-
-  @Test
-  public void testParquetFileMergeKite() throws Exception {
-    parquetJobConfiguratorImplementation = KITE;
+  public void testParquetFileMerge() throws Exception {
     runMergeTest(SqoopOptions.FileLayout.ParquetFile);
   }
 
index 0fab188..be1d816 100644 (file)
@@ -18,9 +18,6 @@
 
 package org.apache.sqoop;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.sqoop.testutil.ExportJobTestCase;
 import com.google.common.collect.Lists;
@@ -32,8 +29,6 @@ import org.junit.Rule;
 
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 import parquet.avro.AvroParquetWriter;
 
 import java.io.IOException;
@@ -44,7 +39,6 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 
@@ -58,23 +52,11 @@ import static parquet.hadoop.metadata.CompressionCodecName.SNAPPY;
 /**
  * Test that we can export Parquet Data Files from HDFS into databases.
  */
-@RunWith(Parameterized.class)
 public class TestParquetExport extends ExportJobTestCase {
 
-  @Parameterized.Parameters(name = "parquetImplementation = {0}")
-  public static Iterable<? extends Object> parquetImplementationParameters() {
-    return Arrays.asList("kite", "hadoop");
-  }
-
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
-  private final String parquetImplementation;
-
-  public TestParquetExport(String parquetImplementation) {
-    this.parquetImplementation = parquetImplementation;
-  }
-
   /**
    * @return an argv for the CodeGenTool to use when creating tables to export.
    */
@@ -144,8 +126,6 @@ public class TestParquetExport extends ExportJobTestCase {
 
   /**
    * Create a data file that gets exported to the db.
-   * Sqoop uses Kite to export Parquet files so it requires a Kite metadata directory to be present next to the files
-   * but since we do not use Kite in our test cases anymore we generate the .metadata directory here.
    * @param numRecords how many records to write to the file.
    */
   protected void createParquetFile(int numRecords,
@@ -153,7 +133,6 @@ public class TestParquetExport extends ExportJobTestCase {
 
     Schema schema = buildSchema(extraCols);
 
-    createMetadataDir(schema);
     String fileName = UUID.randomUUID().toString() + ".parquet";
     Path filePath = new Path(getTablePath(), fileName);
     try (AvroParquetWriter parquetWriter = new AvroParquetWriter(filePath, schema, SNAPPY, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE)) {
@@ -167,25 +146,6 @@ public class TestParquetExport extends ExportJobTestCase {
     }
   }
 
-  private void createMetadataDir(Schema schema) throws IOException {
-    final String descriptorFileTemplate = "location=file\\:%s\n" +
-        "    version=1\n" +
-        "    compressionType=snappy\n" +
-        "    format=parquet\n";
-    Path metadataDirPath = new Path(getTablePath(), ".metadata");
-    Path schemaFile = new Path(metadataDirPath, "schema.avsc");
-    Path descriptorFile = new Path(metadataDirPath, "descriptor.properties");
-    FileSystem fileSystem = getTablePath().getFileSystem(new Configuration());
-    fileSystem.mkdirs(metadataDirPath);
-
-    try (FSDataOutputStream fileOs = fileSystem.create(schemaFile)) {
-      fileOs.write(schema.toString().getBytes());
-    }
-    try (FSDataOutputStream fileOs = fileSystem.create(descriptorFile)) {
-      fileOs.write(String.format(descriptorFileTemplate, getTablePath()).getBytes());
-    }
-  }
-
   private Schema buildSchema(ColumnGenerator... extraCols) {
     List<Field> fields = new ArrayList<Field>();
     fields.add(buildField("id", Schema.Type.INT));
@@ -492,11 +452,4 @@ public class TestParquetExport extends ExportJobTestCase {
     thrown.reportMissingExceptionWithMessage("Expected Exception on missing Parquet fields");
     runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
   }
-
-  @Override
-  protected Configuration getConf() {
-    Configuration conf = super.getConf();
-    conf.set("parquetjob.configurator.implementation", parquetImplementation);
-    return conf;
-  }
 }
index b1488e8..2810e31 100644 (file)
@@ -18,7 +18,6 @@
 
 package org.apache.sqoop;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.sqoop.testutil.CommonArgs;
 import org.apache.sqoop.testutil.HsqldbTestServer;
 import org.apache.sqoop.testutil.ImportJobTestCase;
@@ -31,9 +30,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sqoop.util.ParquetReader;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -48,32 +44,15 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeTrue;
 
 /**
  * Tests --as-parquetfile.
  */
-@RunWith(Parameterized.class)
 public class TestParquetImport extends ImportJobTestCase {
 
   public static final Log LOG = LogFactory
       .getLog(TestParquetImport.class.getName());
 
-  private static String PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE = "kite";
-
-  private static String PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP = "hadoop";
-
-  @Parameters(name = "parquetImplementation = {0}")
-  public static Iterable<? extends Object> parquetImplementationParameters() {
-    return Arrays.asList(PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE, PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP);
-  }
-
-  private final String parquetImplementation;
-
-  public TestParquetImport(String parquetImplementation) {
-    this.parquetImplementation = parquetImplementation;
-  }
-
   /**
    * Create the argv to pass to Sqoop.
    *
@@ -136,27 +115,17 @@ public class TestParquetImport extends ImportJobTestCase {
   }
 
   @Test
-  public void testHadoopGzipCompression() throws IOException {
-    assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP.equals(parquetImplementation));
+  public void testGzipCompression() throws IOException {
     runParquetImportTest("gzip");
   }
 
-  @Test
-  public void testKiteDeflateCompression() throws IOException {
-    assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE.equals(parquetImplementation));
-    // The current Kite-based Parquet writing implementation uses GZIP compression codec when Deflate is specified.
-    // See: org.kitesdk.data.spi.filesystem.ParquetAppender.getCompressionCodecName()
-    runParquetImportTest("deflate", "gzip");
-  }
-
   /**
    * This test case is added to document that the deflate codec is not supported with
    * the Hadoop Parquet implementation so Sqoop throws an exception when it is specified.
    * @throws IOException
    */
   @Test(expected = IOException.class)
-  public void testHadoopDeflateCompression() throws IOException {
-    assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP.equals(parquetImplementation));
+  public void testDeflateCompression() throws IOException {
     runParquetImportTest("deflate");
   }
 
@@ -334,11 +303,4 @@ public class TestParquetImport extends ImportJobTestCase {
     assertEquals(Type.NULL, field.schema().getTypes().get(0).getType());
     assertEquals(type, field.schema().getTypes().get(1).getType());
   }
-
-  @Override
-  protected Configuration getConf() {
-    Configuration conf = super.getConf();
-    conf.set("parquetjob.configurator.implementation", parquetImplementation);
-    return conf;
-  }
 }
index 436f0e5..a6c8e10 100644 (file)
@@ -23,20 +23,12 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.List;
 
-import org.apache.sqoop.Sqoop;
-
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.sqoop.avro.AvroSchemaMismatchException;
-import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetUtils;
-import org.apache.sqoop.util.ParquetReader;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -54,7 +46,6 @@ import org.apache.sqoop.tool.SqoopTool;
 import org.apache.commons.cli.ParseException;
 import org.junit.rules.ExpectedException;
 
-import static java.util.Collections.sort;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -284,54 +275,6 @@ public class TestHiveImport extends ImportJobTestCase {
         getArgv(false, null), new ImportTool());
   }
 
-  /** Test that strings and ints are handled in the normal fashion as parquet
-   * file. */
-  @Test
-  public void testNormalHiveImportAsParquet() throws IOException {
-    final String TABLE_NAME = "normal_hive_import_as_parquet";
-    setCurTableName(TABLE_NAME);
-    setNumCols(3);
-    String [] types = getTypes();
-    String [] vals = { "'test'", "42", "'somestring'" };
-    String [] extraArgs = {"--as-parquetfile"};
-
-    runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs),
-        new ImportTool());
-    verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}});
-  }
-
-  private void verifyHiveDataset(Object[][] valsArray) {
-    List<String> expected = getExpectedLines(valsArray);
-    List<String> result = new ParquetReader(getTablePath()).readAllInCsv();
-
-    sort(expected);
-    sort(result);
-
-    assertEquals(expected, result);
-  }
-
-  private List<String> getExpectedLines(Object[][] valsArray) {
-    List<String> expectations = new ArrayList<>();
-    if (valsArray != null) {
-      for (Object[] vals : valsArray) {
-        expectations.add(toCsv(vals));
-      }
-    }
-    return expectations;
-  }
-
-  private String toCsv(Object[] vals) {
-    StringBuilder result = new StringBuilder();
-
-    for (Object val : vals) {
-      result.append(val).append(",");
-    }
-
-    result.deleteCharAt(result.length() - 1);
-
-    return result.toString();
-  }
-
   /** Test that table is created in hive with no data import. */
   @Test
   public void testCreateOnlyHiveImport() throws IOException {
@@ -365,108 +308,6 @@ public class TestHiveImport extends ImportJobTestCase {
         new CreateHiveTableTool());
   }
 
-  /**
-   * Test that table is created in hive and replaces the existing table if
-   * any.
-   */
-  @Test
-  public void testCreateOverwriteHiveImportAsParquet() throws IOException {
-    final String TABLE_NAME = "create_overwrite_hive_import_as_parquet";
-    setCurTableName(TABLE_NAME);
-    setNumCols(3);
-    String [] types = getTypes();
-    String [] vals = { "'test'", "42", "'somestring'" };
-    String [] extraArgs = {"--as-parquetfile"};
-    ImportTool tool = new ImportTool();
-
-    runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs), tool);
-    verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}});
-
-    String [] valsToOverwrite = { "'test2'", "24", "'somestring2'" };
-    String [] extraArgsForOverwrite = {"--as-parquetfile", "--hive-overwrite"};
-    runImportTest(TABLE_NAME, types, valsToOverwrite, "",
-        getArgv(false, extraArgsForOverwrite), tool);
-    verifyHiveDataset(new Object[][] {{"test2", 24, "somestring2"}});
-  }
-
-  @Test
-  public void testHiveImportAsParquetWhenTableExistsWithIncompatibleSchema() throws Exception {
-    final String TABLE_NAME = "HIVE_IMPORT_AS_PARQUET_EXISTING_TABLE";
-    setCurTableName(TABLE_NAME);
-    setNumCols(3);
-
-    String [] types = { "VARCHAR(32)", "INTEGER", "DATE" };
-    String [] vals = { "'test'", "42", "'2009-12-31'" };
-    String [] extraArgs = {"--as-parquetfile"};
-
-    createHiveDataSet(TABLE_NAME);
-
-    createTableWithColTypes(types, vals);
-
-    thrown.expect(AvroSchemaMismatchException.class);
-    thrown.expectMessage(KiteParquetUtils.INCOMPATIBLE_AVRO_SCHEMA_MSG + KiteParquetUtils.HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG);
-
-    SqoopOptions sqoopOptions = getSqoopOptions(getConf());
-    sqoopOptions.setThrowOnError(true);
-    Sqoop sqoop = new Sqoop(new ImportTool(), getConf(), sqoopOptions);
-    sqoop.run(getArgv(false, extraArgs));
-
-  }
-
-  private void createHiveDataSet(String tableName) {
-    Schema dataSetSchema = SchemaBuilder
-        .record(tableName)
-            .fields()
-            .name(getColName(0)).type().nullable().stringType().noDefault()
-            .name(getColName(1)).type().nullable().stringType().noDefault()
-            .name(getColName(2)).type().nullable().stringType().noDefault()
-            .endRecord();
-    String dataSetUri = "dataset:hive:/default/" + tableName;
-    KiteParquetUtils.createDataset(dataSetSchema, KiteParquetUtils.getCompressionType(new Configuration()), dataSetUri);
-  }
-
-  /**
-   * Test that records are appended to an existing table.
-   */
-  @Test
-  public void testAppendHiveImportAsParquet() throws IOException {
-    final String TABLE_NAME = "append_hive_import_as_parquet";
-    setCurTableName(TABLE_NAME);
-    setNumCols(3);
-    String [] types = getTypes();
-    String [] vals = { "'test'", "42", "'somestring'" };
-    String [] extraArgs = {"--as-parquetfile"};
-    String [] args = getArgv(false, extraArgs);
-    ImportTool tool = new ImportTool();
-
-    runImportTest(TABLE_NAME, types, vals, "", args, tool);
-    verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}});
-
-    String [] valsToAppend = { "'test2'", "4242", "'somestring2'" };
-    runImportTest(TABLE_NAME, types, valsToAppend, "", args, tool);
-    verifyHiveDataset(new Object[][] {
-        {"test2", 4242, "somestring2"}, {"test", 42, "somestring"}});
-  }
-
-  /**
-   * Test hive create and --as-parquetfile options validation.
-   */
-  @Test
-  public void testCreateHiveImportAsParquet() throws ParseException, InvalidOptionsException {
-    final String TABLE_NAME = "CREATE_HIVE_IMPORT_AS_PARQUET";
-    setCurTableName(TABLE_NAME);
-    setNumCols(3);
-    String [] extraArgs = {"--as-parquetfile", "--create-hive-table"};
-    ImportTool tool = new ImportTool();
-
-    thrown.expect(InvalidOptionsException.class);
-    thrown.reportMissingExceptionWithMessage("Expected InvalidOptionsException during Hive table creation with " +
-        "--as-parquetfile");
-    tool.validateOptions(tool.parseArguments(getArgv(false, extraArgs), null,
-        null, true));
-  }
-
-
   /** Test that dates are coerced properly to strings. */
   @Test
   public void testDate() throws IOException {
index dbda8b7..5571b25 100644 (file)
@@ -28,7 +28,6 @@ import org.junit.rules.ExpectedException;
 import org.mockito.Mockito;
 
 import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.HADOOP;
-import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE;
 import static org.hamcrest.CoreMatchers.sameInstance;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
@@ -124,7 +123,7 @@ public class TestBaseSqoopTool {
   public void testApplyCommonOptionsThrowsWhenInvalidParquetJobConfigurationImplementationIsSet() throws Exception {
     when(mockCommandLine.getOptionValue("parquet-configurator-implementation")).thenReturn("this_is_definitely_not_valid");
 
-    exception.expectMessage("Invalid Parquet job configurator implementation is set: this_is_definitely_not_valid. Supported values are: [KITE, HADOOP]");
+    exception.expectMessage("Invalid Parquet job configurator implementation is set: this_is_definitely_not_valid. Supported values are: [HADOOP]");
     testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions);
   }
 
@@ -132,6 +131,6 @@ public class TestBaseSqoopTool {
   public void testApplyCommonOptionsDoesNotChangeDefaultParquetJobConfigurationImplementationWhenNothingIsSet() throws Exception {
     testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions);
 
-    assertEquals(KITE, testSqoopOptions.getParquetConfiguratorImplementation());
+    assertEquals(HADOOP, testSqoopOptions.getParquetConfiguratorImplementation());
   }
 }