SQOOP-931: Integrate HCatalog with Sqoop
authorJarek Jarcec Cecho <jarcec@apache.org>
Fri, 7 Jun 2013 14:33:21 +0000 (07:33 -0700)
committerJarek Jarcec Cecho <jarcec@apache.org>
Fri, 7 Jun 2013 14:33:21 +0000 (07:33 -0700)
(Venkat Ranganathan via Jarek Jarcec Cecho)

38 files changed:
bin/configure-sqoop
bin/configure-sqoop.cmd
build.xml
ivy.xml
ivy/ivysettings.xml
src/docs/user/SqoopUserGuide.txt
src/docs/user/hcatalog.txt [new file with mode: 0644]
src/java/org/apache/sqoop/SqoopOptions.java
src/java/org/apache/sqoop/config/ConfigurationConstants.java
src/java/org/apache/sqoop/hive/HiveImport.java
src/java/org/apache/sqoop/manager/ConnManager.java
src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
src/java/org/apache/sqoop/mapreduce/JobBase.java
src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportFormat.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatInputSplit.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatRecordReader.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatUtilities.java [new file with mode: 0644]
src/java/org/apache/sqoop/tool/BaseSqoopTool.java
src/java/org/apache/sqoop/tool/CodeGenTool.java
src/java/org/apache/sqoop/tool/ExportTool.java
src/java/org/apache/sqoop/tool/ImportTool.java
src/perftest/ExportStressTest.java
src/test/com/cloudera/sqoop/ThirdPartyTests.java
src/test/com/cloudera/sqoop/hive/TestHiveImport.java
src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java
src/test/org/apache/sqoop/hcat/HCatalogExportTest.java [new file with mode: 0644]
src/test/org/apache/sqoop/hcat/HCatalogImportTest.java [new file with mode: 0644]
src/test/org/apache/sqoop/hcat/HCatalogTestUtils.java [new file with mode: 0644]
src/test/org/apache/sqoop/hcat/TestHCatalogBasic.java [new file with mode: 0644]
testdata/hcatalog/conf/hive-log4j.properties [new file with mode: 0644]
testdata/hcatalog/conf/hive-site.xml [new file with mode: 0644]
testdata/hcatalog/conf/log4j.properties [new file with mode: 0644]

index 61ff3f2..178720d 100755 (executable)
@@ -54,9 +54,22 @@ if [ -z "${HADOOP_MAPRED_HOME}" ]; then
     HADOOP_MAPRED_HOME=/usr/lib/hadoop-mapreduce
   fi
 fi
+
+# We are setting HADOOP_HOME to HADOOP_COMMON_HOME if it is not set
+# so that hcat script works correctly on BigTop
+if [ -z "${HADOOP_HOME}" ]; then
+  if [ -n "${HADOOP_COMMON_HOME}" ]; then
+     HADOOP_HOME=${HADOOP_COMMON_HOME}
+     export HADOOP_HOME
+  fi
+fi
+
 if [ -z "${HBASE_HOME}" ]; then
   HBASE_HOME=/usr/lib/hbase
 fi
+if [ -z "${HCAT_HOME}" ]; then
+  HCAT_HOME=/usr/lib/hcatalog
+fi
 
 # Check: If we can't find our dependencies, give up here.
 if [ ! -d "${HADOOP_COMMON_HOME}" ]; then
@@ -76,6 +89,12 @@ if [ ! -d "${HBASE_HOME}" ]; then
   echo 'Please set $HBASE_HOME to the root of your HBase installation.'
 fi
 
+## Moved to be a runtime check in sqoop.
+if [ ! -d "${HCAT_HOME}" ]; then
+  echo "Warning: $HCAT_HOME does not exist! HCatalog jobs will fail."
+  echo 'Please set $HCAT_HOME to the root of your HCatalog installation.'
+fi
+
 # Where to find the main Sqoop jar
 SQOOP_JAR_DIR=$SQOOP_HOME
 
@@ -106,6 +125,15 @@ if [ -e "$HBASE_HOME/bin/hbase" ]; then
   SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH}
 fi
 
+# Add HCatalog to dependency list
+if [ -e "${HCAT_HOME}/bin/hcat" ]; then
+  TMP_SQOOP_CLASSPATH=${SQOOP_CLASSPATH}:`${HCAT_HOME}/bin/hcat -classpath`
+  if [ -z "${HIVE_CONF_DIR}" ]; then
+    TMP_SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH}:${HIVE_CONF_DIR}
+  fi
+  SQOOP_CLASSPATH=${TMP_SQOOP_CLASSPATH}
+fi
+
 ZOOCFGDIR=${ZOOCFGDIR:-/etc/zookeeper}
 if [ -d "${ZOOCFGDIR}" ]; then
   SQOOP_CLASSPATH=$ZOOCFGDIR:$SQOOP_CLASSPATH
@@ -136,4 +164,6 @@ export HADOOP_CLASSPATH
 export HADOOP_COMMON_HOME
 export HADOOP_MAPRED_HOME
 export HBASE_HOME
+export HCAT_HOME
+export HIVE_CONF_DIR
 
index f5fd608..ec57e37 100644 (file)
@@ -55,6 +55,15 @@ if not defined HADOOP_MAPRED_HOME (
     exit /b 1
   )
 )
+
+:: We are setting HADOOP_HOME to HADOOP_COMMON_HOME if it is not set
+:: so that hcat script works correctly on BigTop
+if not defined HADOOP_HOME (
+  if defined HADOOP_COMMON_HOME (
+    set HADOOP_HOME=%HADOOP_COMMON_HOME%
+  )
+)
+
 :: Check for HBase dependency
 if not defined HBASE_HOME (
   if defined HBASE_VERSION (
index 636c103..b4b08e5 100644 (file)
--- a/build.xml
+++ b/build.xml
@@ -51,6 +51,7 @@
       <property name="hbase.version" value="0.90.3-cdh3u1" />
       <property name="zookeeper.version" value="3.3.3-cdh3u1" />
       <property name="hadoop.version.full" value="0.20" />
+      <property name="hcatalog.version" value="0.11.0" />
     </then>
 
     <elseif>
@@ -60,6 +61,7 @@
         <property name="hbase.version" value="0.92.0" />
         <property name="zookeeper.version" value="3.4.2" />
         <property name="hadoop.version.full" value="0.23" />
+        <property name="hcatalog.version" value="0.11.0" />
       </then>
     </elseif>
 
@@ -70,6 +72,7 @@
         <property name="hbase.version" value="0.92.0" />
         <property name="zookeeper.version" value="3.4.2" />
         <property name="hadoop.version.full" value="1.0.0" />
+        <property name="hcatalog.version" value="0.11.0" />
       </then>
     </elseif>
 
@@ -80,6 +83,7 @@
         <property name="hbase.version" value="0.94.2" />
         <property name="zookeeper.version" value="3.4.2" />
         <property name="hadoop.version.full" value="2.0.4-alpha" />
+        <property name="hcatalog.version" value="0.11.0" />
       </then>
     </elseif>
 
       <tarfileset dir="${build.dir}" mode="755">
         <include name="${bin.artifact.name}/bin/*" />
         <include name="${bin.artifact.name}/testdata/hive/bin/*" />
+        <include name="${bin.artifact.name}/testdata/hcatalog/conf/*" />
         <include name="${bin.artifact.name}/**/*.sh" />
       </tarfileset>
     </tar>
       <tarfileset dir="${build.dir}" mode="664">
         <exclude name="${src.artifact.name}/bin/*" />
         <exclude name="${src.artifact.name}/testdata/hive/bin/*" />
+        <exclude name="${src.artifact.name}/testdata/hcatalog/conf/*" />
         <exclude name="${src.artifact.name}/**/*.sh" />
         <include name="${src.artifact.name}/**" />
       </tarfileset>
       <tarfileset dir="${build.dir}" mode="755">
         <include name="${src.artifact.name}/bin/*" />
         <include name="${src.artifact.name}/testdata/hive/bin/*" />
+        <include name="${src.artifact.name}/testdata/hcatalog/conf/*" />
         <include name="${src.artifact.name}/**/*.sh" />
       </tarfileset>
     </tar>
   <target name="test-prep" depends="test-prep-normal,test-prep-thirdparty,
                                     test-prep-manual"/>
 
+  <path id="hcatalog.conf.dir">
+     <pathelement location="${basedir}/testdata/hcatalog/conf"/>
+  </path>
   <target name="test-eval-condition">
     <condition property="thirdparty_or_manual">
       <or>
     </condition>
   </target>
 
+
+
   <target name="test-prep-normal" unless="thirdparty_or_manual"
                                   depends="test-eval-condition">
     <!-- Set this to run all the "standard" tests -->
     <delete dir="${test.log.dir}"/>
     <mkdir dir="${test.log.dir}"/>
     <delete dir="${build.test}/data"/>
-    <mkdir dir="${build.test}/data" />
+    <mkdir dir="${build.test}/data/sqoop" />
     <mkdir dir="${cobertura.class.dir}" />
     <junit
       printsummary="yes" showoutput="${test.output}"
       <sysproperty key="java.security.krb5.kdc"
                    value="${java.security.krb5.kdc}"/>
 
+      <!-- Location of Hive logs -->
+      <!--<sysproperty key="hive.log.dir"
+                   value="${test.build.data}/sqoop/logs"/> -->
+
       <classpath>
         <!-- instrumented classes go ahead of normal classes -->
         <pathelement location="${cobertura.class.dir}" />
 
+        <!-- Location of hive-site xml and other hadoop config files -->
+        <path refid="hcatalog.conf.dir" />
+
         <!-- main classpath here. -->
         <path refid="test.classpath" />
 
diff --git a/ivy.xml b/ivy.xml
index 1fa4dd1..750adfc 100644 (file)
--- a/ivy.xml
+++ b/ivy.xml
@@ -37,10 +37,15 @@ under the License.
       extends="runtime"
       description="artifacts needed to compile/test the application"/>
     <conf name="hbase" visibility="private" />
-    <conf name="hadoop23" visibility="private" extends="common,runtime,hbase" />
-    <conf name="hadoop20" visibility="private" extends="common,runtime,hbase" />
-    <conf name="hadoop100" visibility="private" extends="common,runtime,hbase" />
-    <conf name="hadoop200" visibility="private" extends="common,runtime,hbase" />
+    <conf name="hcatalog" visibility="private" />
+    <conf name="hadoop23" visibility="private"
+      extends="common,runtime,hbase,hcatalog" />
+    <conf name="hadoop20" visibility="private"
+      extends="common,runtime,hbase,hcatalog" />
+    <conf name="hadoop100" visibility="private"
+      extends="common,runtime,hbase,hcatalog" />
+    <conf name="hadoop200" visibility="private"
+      extends="common,runtime,hbase,hcatalog" />
 
     <conf name="test" visibility="private" extends="common,runtime"/>
     <conf name="hadoop23test" visibility="private" extends="test,hadoop23" />
@@ -172,6 +177,11 @@ under the License.
       <exclude org="com.cloudera.cdh" module="zookeeper-ant" />
     </dependency>
 
+    <dependency org="org.apache.hcatalog" name="hcatalog-core"
+      rev="${hcatalog.version}" conf="hcatalog->default">
+      <artifact name="hcatalog-core" type="jar"/>
+    </dependency>
+
     <exclude org="org.apache.hadoop" module="avro"/>
     <exclude org="commons-daemon" module="commons-daemon" />
     <exclude type="pom" />
index c4cc561..2920c89 100644 (file)
@@ -42,6 +42,9 @@ under the License.
   <property name="releases.cloudera.com"
       value="https://repository.cloudera.com/content/repositories/releases/"
       override="false"/>
+  <property name="www.datanucleus.org"
+      value="http://www.datanucleus.org/downloads/maven2/"
+      override="false"/>
   <property name="maven2.pattern"
       value="[organisation]/[module]/[revision]/[artifact]-[revision](-[classifier])"/>
   <property name="repo.dir" value="${user.home}/.m2/repository"/>
@@ -52,6 +55,8 @@ under the License.
   <resolvers>
     <ibiblio name="maven2" root="${repo.maven.org}"
         pattern="${maven2.pattern.ext}" m2compatible="true"/>
+    <ibiblio name="datanucleus" root="${www.datanucleus.org}"
+        pattern="${maven2.pattern.ext}" m2compatible="true"/>
     <ibiblio name="cloudera-releases" root="${releases.cloudera.com}"
         pattern="${maven2.pattern.ext}" m2compatible="true"/>
     <ibiblio name="apache-snapshot" root="${snapshot.apache.org}"
@@ -67,16 +72,18 @@ under the License.
     <chain name="default" dual="true" checkmodified="true"
         changingPattern=".*SNAPSHOT">
       <resolver ref="fs"/>
-      <resolver ref="apache-snapshot"/> 
+      <resolver ref="apache-snapshot"/>
+      <resolver ref="datanucleus"/>
       <resolver ref="cloudera-releases"/>
-      <resolver ref="cloudera-staging"/> 
+      <resolver ref="cloudera-staging"/>
       <resolver ref="maven2"/>
     </chain>
 
     <chain name="internal" dual="true">
       <resolver ref="fs"/>
-      <resolver ref="apache-snapshot"/> 
-      <resolver ref="cloudera-staging"/> 
+      <resolver ref="apache-snapshot"/>
+      <resolver ref="datanucleus"/>
+      <resolver ref="cloudera-staging"/>
       <resolver ref="maven2"/>
     </chain>
 
index 01ac1cf..2e88887 100644 (file)
@@ -72,6 +72,8 @@ include::help.txt[]
 
 include::version.txt[]
 
+include::hcatalog.txt[]
+
 include::compatibility.txt[]
 
 include::connectors.txt[]
diff --git a/src/docs/user/hcatalog.txt b/src/docs/user/hcatalog.txt
new file mode 100644 (file)
index 0000000..b8e495e
--- /dev/null
@@ -0,0 +1,313 @@
+
+////
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+////
+
+Sqoop-HCatalog Integration
+--------------------------
+
+HCatalog Background
+~~~~~~~~~~~~~~~~~~~
+
+HCatalog is a table and storage management service for Hadoop that enables
+users with different data processing tools – Pig, MapReduce, and Hive –
+to more easily read and write data on the grid. HCatalog’s table abstraction
+presents users with a relational view of data in the Hadoop distributed
+file system (HDFS) and ensures that users need not worry about where or
+in what format their data is stored: RCFile format, text files, or
+SequenceFiles.
+
+HCatalog supports reading and writing files in any format for which a Hive
+SerDe (serializer-deserializer) has been written. By default, HCatalog
+supports RCFile, CSV, JSON, and SequenceFile formats. To use a custom
+format, you must provide the InputFormat and OutputFormat as well as the SerDe.
+
+The ability of HCatalog to abstract various storage formats is used in
+providing the RCFile (and future file types) support to Sqoop.
+
+Exposing HCatalog Tables to Sqoop
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+HCatalog integration with Sqoop is patterned on an existing feature set that
+supports Avro and Hive tables. Five new command line options are introduced,
+and some command line options defined for Hive are reused.
+
+New Command Line Options
+^^^^^^^^^^^^^^^^^^^^^^^^
+
++--hcatalog-database+::
+Specifies the database name for the HCatalog table. If not specified,
+the default database name +default+ is used. Providing the
++--hcatalog-database+ option without +--hcatalog-table+ is an error.
+This is not a required option.
+
++--hcatalog-table+::
+The argument value for this option is the HCatalog tablename.
+The presence of the +--hcatalog-table+ option signifies that the import
+or export job is done using HCatalog tables, and it is a required option for
+HCatalog jobs.
+
++--hcatalog-home+::
+The home directory for the HCatalog installation. The directory is
+expected to have a +lib+ subdirectory and a +share/hcatalog+ subdirectory
+with necessary HCatalog libraries. If not specified, the system property
++hcatalog.home+ will be checked and failing that, a system environment
+variable +HCAT_HOME+ will be checked.  If none of these are set, the
+default value will be used and currently the default is set to
++/usr/lib/hcatalog+.
+This is not a required option.
+
++--create-hcatalog-table+::
+
+This option specifies whether an HCatalog table should be created
+automatically when importing data. By default, HCatalog tables are assumed
+to exist. The table name will be the same as the database table name
+translated to lower case. Further described in +Automatic Table Creation+
+below.
+
++--hcatalog-storage-stanza+::
+
+This option specifies the storage stanza to be appended to the table.
+Further described in +Automatic Table Creation+ below.
+
+Supported Sqoop Hive Options
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The following Sqoop options are also used along with the +--hcatalog-table+
+option to provide additional input to the HCatalog jobs. Some of the existing
+Hive import job options are reused with HCatalog jobs instead of creating
+HCatalog-specific options for the same purpose.
+
++--map-column-hive+::
+This option maps a database column to HCatalog with a specific HCatalog
+type.
+
++--hive-home+::
+The Hive home location.
+
++--hive-partition-key+::
+Used for static partitioning filter. The partitioning key should be of
+type STRING. There can be only one static partitioning key.
+
++--hive-partition-value+::
+The value associated with the partition.
+
+Unsupported Sqoop Options
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Unsupported Sqoop Hive Import Options
++++++++++++++++++++++++++++++++++++++
+
+The following Sqoop Hive import options are not supported with HCatalog jobs.
+
+* +--hive-import+
+* +--hive-overwrite+
+
+Unsupported Sqoop Export and Import Options
++++++++++++++++++++++++++++++++++++++++++++
+
+The following Sqoop export and import options are not supported with HCatalog jobs.
+
+* +--direct+
+* +--export-dir+
+* +--target-dir+
+* +--warehouse-dir+
+* +--append+
+* +--as-sequencefile+
+* +--as-avrofile+
+
+Ignored Sqoop Options
+^^^^^^^^^^^^^^^^^^^^^
+
+The following options are ignored with HCatalog jobs.
+
+* All input delimiter options are ignored.
+
+* Output delimiters are generally ignored unless either
++--hive-drop-import-delims+ or +--hive-delims-replacement+ is used. When the
++--hive-drop-import-delims+ or +--hive-delims-replacement+ option is
+specified, all +CHAR+ type database table columns will be post-processed
+to either remove or replace the delimiters, respectively. See +Delimited Text
+Formats and Field and Line Delimiter Characters+ below. This is only needed
+if the HCatalog table uses text formats.
+
+Automatic Table Creation
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+One of the key features of Sqoop is to manage and create the table metadata
+when importing into Hadoop. HCatalog import jobs also provide for this
+feature with the option +--create-hcatalog-table+. Furthermore, one of the
+important benefits of the HCatalog integration is to provide storage
+agnosticism to Sqoop data movement jobs. To provide for that feature,
+HCatalog import jobs provide an option that lets a user specifiy the
+storage format for the created table.
+
+The option +--create-hcatalog-table+ is used as an indicator that a table
+has to be created as part of the HCatalog import job.  If the option 
++--create-hcatalog-table+ is specified and the table exists, then the
+table creation will fail and the job will be aborted.
+
+The option +--hcatalog-storage-stanza+ can be used to specify the storage
+format of the newly created table. The default value for this option is
++stored as rcfile+. The value specified for this option is assumed to be a
+valid Hive storage format expression. It will be appended to the +create table+
+command generated by the HCatalog import job as part of automatic table
+creation. Any error in the storage stanza will cause the table creation to
+fail and the import job will be aborted.
+
+Any additional resources needed to support the storage format referenced in
+the option +--hcatalog-storage-stanza+ should be provided to the job either
+by placing them in +$HIVE_HOME/lib+ or by providing them in +HADOOP_CLASSPATH+
+and +LIBJAR+ files.
+
+If the option +--hive-partition-key+ is specified, then the value of this
+option is used as the partitioning key for the newly created table. Only
+one partitioning key can be specified with this option.
+
+Object names are mapped to the lowercase equivalents as specified below
+when mapped to an HCatalog table. This includes the table name (which
+is the same as the external store table name converted to lower case)
+and field names.
+
+Delimited Text Formats and Field and Line Delimiter Characters
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+HCatalog supports delimited text format as one of the table storage formats.
+But when delimited text is used and the imported data has fields that contain
+those delimiters, then the data may be parsed into a different number of
+fields and records by Hive, thereby losing data fidelity.
+
+For this case, one of these existing Sqoop import options can be used:
+
+* +--hive-delims-replacement+
+
+* +--hive-drop-import-delims+
+
+If either of these options is provided for import, then any column of type
+STRING will be formatted with the Hive delimiter processing and then written
+to the HCatalog table.
+
+HCatalog Table Requirements
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The HCatalog table should be created before using it as part of a Sqoop job
+if the default table creation options (with optional storage stanza) are not
+sufficient. All storage formats supported by HCatalog can be used with the
+creation of the HCatalog tables. This makes this feature readily adopt new
+storage formats that come into the Hive project, such as ORC files.
+
+Support for Partitioning
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+The Sqoop HCatalog feature supports the following table types:
+
+* Unpartitioned tables
+
+* Partitioned tables with a static partitioning key specified
+
+* Partitioned tables with dynamic partition keys from the database
+result set
+
+* Partitioned tables with a combination of a static key and additional
+dynamic partitioning keys
+
+Schema Mapping
+~~~~~~~~~~~~~~
+
+Sqoop currently does not support column name mapping. However, the user
+is allowed to override the type mapping. Type mapping loosely follows
+the Hive type mapping already present in Sqoop except that SQL types
+“FLOAT” and “REAL” are mapped to HCatalog type “float”. In the Sqoop type
+mapping for Hive, these two are mapped to “double”. Type mapping is primarily
+used for checking the column definition correctness only and can be overridden
+with the --map-column-hive option.
+
+All types except binary are assignable to a String type.
+
+Any field of number type (int, shortint, tinyint, bigint and bigdecimal,
+float and double) is assignable to another field of any number type during
+exports and imports. Depending on the precision and scale of the target type
+of assignment, truncations can occur.
+
+Furthermore, date/time/timestamps are mapped to string (the full
+date/time/timestamp representation) or bigint (the number of milliseconds
+since epoch) during imports and exports.
+
+BLOBs and CLOBs are only supported for imports. The BLOB/CLOB objects when
+imported are stored in a Sqoop-specific format and knowledge of this format
+is needed for processing these objects in a Pig/Hive job or another Map Reduce
+job.
+
+Database column names are mapped to their lowercase equivalents when mapped
+to the HCatalog fields. Currently, case-sensitive database object names are
+not supported.
+
+Projection of a set of columns from a table to an HCatalog table or loading
+to a column projection is allowed, subject to table constraints. The dynamic
+partitioning columns, if any, must be part of the projection when importing
+data into HCatalog tables.
+
+Dynamic partitioning fields should be mapped to database columns that are
+defined with the NOT NULL attribute (although this is not validated). A null
+value during import for a dynamic partitioning column will abort the Sqoop
+job.
+
+Support for HCatalog Data Types
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+All the primitive HCatalog types are supported. Currently all the complex
+HCatalog types are unsupported.
+
+BLOB/CLOB database types are only supported for imports.
+
+Providing Hive and HCatalog Libraries for the Sqoop Job
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+With the support for HCatalog added to Sqoop, any HCatalog job depends on a
+set of jar files being available both on the Sqoop client host and where the
+Map/Reduce tasks run. To run HCatalog jobs, the environment variable
++HADOOP_CLASSPATH+ must be set up as shown below before launching the Sqoop
+HCatalog jobs.
+
++HADOOP_CLASSPATH=$(hcat -classpath)+
++export HADOOP_CLASSPATH+
+
+
+The necessary HCatalog dependencies will be copied to the distributed cache
+automatically by the Sqoop job.
+
+Examples
+~~~~~~~~
+
+Create an HCatalog table, such as:
+
++hcat -e "create table txn(txn_date string, cust_id string, amount float,
+store_id int) partitioned by (cust_id string) stored as rcfile;"+
+
+
+Then Sqoop import and export of the "txn" HCatalog table can be invoked as
+follows:
+
+Import
+~~~~~~
+
++$SQOOP_HOME/bin/sqoop import --connect <jdbc-url> -table <table-name> --hcatalog-table txn <other sqoop options>+
+
+Export
+~~~~~~
+
++$SQOOP_HOME/bin/sqoop export --connect <jdbc-url> -table <table-name> --hcatalog-table txn <other sqoop options>+
index f18d43e..4be6a6a 100644 (file)
@@ -59,6 +59,10 @@ public class SqoopOptions implements Cloneable {
   public static final String METASTORE_PASSWORD_KEY =
       "sqoop.metastore.client.record.password";
 
+  // Default hive and hcat locations.
+  public static final String DEF_HIVE_HOME = "/usr/lib/hive";
+  public static final String DEF_HCAT_HOME = "/usr/lib/hcatalog";
+
   public static final boolean METASTORE_PASSWORD_DEFAULT = false;
 
   /**
@@ -151,6 +155,15 @@ public class SqoopOptions implements Cloneable {
   private String hiveDelimsReplacement;
   @StoredAsProperty("hive.partition.key") private String hivePartitionKey;
   @StoredAsProperty("hive.partition.value") private String hivePartitionValue;
+  @StoredAsProperty("hcatalog.table.name")
+  private String hCatTableName;
+  @StoredAsProperty("hcatalog.database.name")
+  private String hCatDatabaseName;
+  @StoredAsProperty("hcatalog.create.table")
+  private boolean hCatCreateTable;
+  @StoredAsProperty("hcatalog.storage.stanza")
+  private String hCatStorageStanza;
+  private String hCatHome; // not serialized to metastore.
 
   // User explicit mapping of types
   private Properties mapColumnJava; // stored as map.colum.java
@@ -197,7 +210,9 @@ public class SqoopOptions implements Cloneable {
 
   private DelimiterSet inputDelimiters; // codegen.input.delimiters.
   private DelimiterSet outputDelimiters; // codegen.output.delimiters.
-  private boolean areDelimsManuallySet;
+
+  private boolean areOutputDelimsManuallySet;
+  private boolean areInputDelimsManuallySet;
 
   private Configuration conf;
 
@@ -580,7 +595,8 @@ public class SqoopOptions implements Cloneable {
 
     // Delimiters were previously memoized; don't let the tool override
     // them with defaults.
-    this.areDelimsManuallySet = true;
+    this.areOutputDelimsManuallySet = true;
+    this.areInputDelimsManuallySet = true;
 
     // If we loaded true verbose flag, we need to apply it
     if (this.verbose) {
@@ -804,7 +820,21 @@ public class SqoopOptions implements Cloneable {
   public static String getHiveHomeDefault() {
     // Set this with $HIVE_HOME, but -Dhive.home can override.
     String hiveHome = System.getenv("HIVE_HOME");
-    return System.getProperty("hive.home", hiveHome);
+    hiveHome = System.getProperty("hive.home", hiveHome);
+    if (hiveHome == null) {
+      hiveHome = DEF_HIVE_HOME;
+    }
+    return hiveHome;
+  }
+
+  public static String getHCatHomeDefault() {
+    // Set this with $HCAT_HOME, but -Dhcatalog.home can override.
+    String hcatHome = System.getenv("HCAT_HOME");
+    hcatHome = System.getProperty("hcat.home", hcatHome);
+    if (hcatHome == null) {
+      hcatHome = DEF_HCAT_HOME;
+    }
+    return hcatHome;
   }
 
   private void initDefaults(Configuration baseConfiguration) {
@@ -813,6 +843,7 @@ public class SqoopOptions implements Cloneable {
     this.hadoopMapRedHome = System.getenv("HADOOP_MAPRED_HOME");
 
     this.hiveHome = getHiveHomeDefault();
+    this.hCatHome = getHCatHomeDefault();
 
     this.inputDelimiters = new DelimiterSet(
         DelimiterSet.NULL_CHAR, DelimiterSet.NULL_CHAR,
@@ -834,7 +865,8 @@ public class SqoopOptions implements Cloneable {
     this.jarDirIsAuto = true;
     this.layout = FileLayout.TextFile;
 
-    this.areDelimsManuallySet = false;
+    this.areOutputDelimsManuallySet = false;
+    this.areInputDelimsManuallySet = false;
 
     this.numMappers = DEFAULT_NUM_MAPPERS;
     this.useCompression = false;
@@ -1263,6 +1295,47 @@ public class SqoopOptions implements Cloneable {
     this.failIfHiveTableExists = fail;
   }
 
+  // HCatalog support
+  public void setHCatTableName(String ht) {
+    this.hCatTableName = ht;
+  }
+
+  public String getHCatTableName() {
+    return this.hCatTableName;
+  }
+
+  public void setHCatDatabaseName(String hd) {
+    this.hCatDatabaseName = hd;
+  }
+
+  public String getHCatDatabaseName() {
+    return this.hCatDatabaseName;
+  }
+
+
+  public String getHCatHome() {
+    return hCatHome;
+  }
+
+  public void setHCatHome(String home) {
+    this.hCatHome = home;
+  }
+
+  public boolean doCreateHCatalogTable() {
+    return hCatCreateTable;
+  }
+
+  public void setCreateHCatalogTable(boolean create) {
+    this.hCatCreateTable = create;
+  }
+
+  public void setHCatStorageStanza(String stanza) {
+    this.hCatStorageStanza = stanza;
+  }
+
+  public String getHCatStorageStanza() {
+    return this.hCatStorageStanza;
+  }
   /**
    * @return location where .java files go; guaranteed to end with '/'.
    */
@@ -1673,18 +1746,32 @@ public class SqoopOptions implements Cloneable {
     this.fetchSize = size;
   }
 
+  /*
+   * @return true if the output delimiters have been explicitly set by the user
+   */
+  public boolean explicitOutputDelims() {
+    return areOutputDelimsManuallySet;
+  }
+
   /**
-   * @return true if the delimiters have been explicitly set by the user.
+   * Flag the output delimiter settings as explicit user settings, or implicit.
    */
-  public boolean explicitDelims() {
-    return areDelimsManuallySet;
+  public void setExplicitOutputDelims(boolean explicit) {
+    this.areOutputDelimsManuallySet = explicit;
   }
 
   /**
-   * Flag the delimiter settings as explicit user settings, or implicit.
+   * @return true if the input delimiters have been explicitly set by the user.
    */
-  public void setExplicitDelims(boolean explicit) {
-    this.areDelimsManuallySet = explicit;
+  public boolean explicitInputDelims() {
+    return areInputDelimsManuallySet;
+  }
+
+  /**
+   * Flag the input delimiter settings as explicit user settings, or implicit.
+    */
+  public void setExplicitInputDelims(boolean explicit) {
+    this.areInputDelimsManuallySet = explicit;
   }
 
   public Configuration getConf() {
index 5354063..2070b63 100644 (file)
@@ -60,6 +60,18 @@ public final class ConfigurationConstants {
   public static final String PROP_MAPRED_JOB_TRACKER_ADDRESS =
                                 "mapred.job.tracker";
 
+   /**
+   * The Configuration property identifying the job tracker address (new).
+   */
+  public static final String PROP_MAPREDUCE_JOB_TRACKER_ADDRESS =
+    "mapreduce.jobtracker.address";
+
+  /**
+   * The Configuration property identifying the framework name. If set to YARN
+   * then we will not be in local mode.
+   */
+  public static final String PROP_MAPREDUCE_FRAMEWORK_NAME =
+    "mapreduce.framework.name";
   /**
    * The group name of task counters.
    */
@@ -78,6 +90,11 @@ public final class ConfigurationConstants {
   public static final String COUNTER_MAP_INPUT_RECORDS =
                                 "MAP_INPUT_RECORDS";
 
+  /**
+   * The name of the parameter for ToolRunner to set jars to add to distcache.
+   */
+  public static final String MAPRED_DISTCACHE_CONF_PARAM = "tmpjars";
+
   private ConfigurationConstants() {
     // Disable Explicit Object Creation
   }
index 838f083..02596a6 100644 (file)
@@ -60,6 +60,15 @@ public class HiveImport {
   private ConnManager connManager;
   private Configuration configuration;
   private boolean generateOnly;
+  private static boolean testMode = false;
+
+  public static boolean getTestMode() {
+    return testMode;
+  }
+
+  public static void setTestMode(boolean mode) {
+    testMode = mode;
+  }
 
   /** Entry point through which Hive invocation should be attempted. */
   private static final String HIVE_MAIN_CLASS =
@@ -285,6 +294,14 @@ public class HiveImport {
       throws IOException {
     SubprocessSecurityManager subprocessSM = null;
 
+    if (testMode) {
+      // We use external mock hive process for test mode as
+      // HCatalog dependency would have brought in Hive classes.
+      LOG.debug("Using external Hive process in test mode.");
+      executeExternalHiveScript(filename, env);
+      return;
+    }
+
     try {
       Class cliDriverClass = Class.forName(HIVE_MAIN_CLASS);
 
index a1ac38e..3549bda 100644 (file)
@@ -164,6 +164,70 @@ public abstract class ConnManager {
     return HiveTypes.toHiveType(sqlType);
   }
 
+   /**
+   * Resolve a database-specific type to HCat data type. Largely follows Sqoop's
+   * hive translation.
+   * @param sqlType
+   *          sql type
+   * @return hcat type
+   */
+  public String toHCatType(int sqlType) {
+    switch (sqlType) {
+
+    // Ideally TINYINT and SMALLINT should be mapped to their
+    // HCat equivalents tinyint and smallint respectively
+    // But the Sqoop Java type conversion has them mapped to Integer
+    // Even though the referenced Java doc clearly recommends otherwise.
+    // Chaning this now can cause many of the sequence file usages to
+    // break as value class implementations will change. So, we
+    // just use the same behavior here.
+      case Types.SMALLINT:
+      case Types.TINYINT:
+      case Types.INTEGER:
+        return "int";
+
+      case Types.VARCHAR:
+      case Types.CHAR:
+      case Types.LONGVARCHAR:
+      case Types.NVARCHAR:
+      case Types.NCHAR:
+      case Types.LONGNVARCHAR:
+      case Types.DATE:
+      case Types.TIME:
+      case Types.TIMESTAMP:
+      case Types.CLOB:
+        return "string";
+
+      case Types.FLOAT:
+      case Types.REAL:
+        return "float";
+
+      case Types.NUMERIC:
+      case Types.DECIMAL:
+        return "string";
+
+      case Types.DOUBLE:
+        return "double";
+
+      case Types.BIT:
+      case Types.BOOLEAN:
+        return "boolean";
+
+      case Types.BIGINT:
+        return "bigint";
+
+      case Types.BINARY:
+      case Types.VARBINARY:
+      case Types.BLOB:
+      case Types.LONGVARBINARY:
+        return "binary";
+
+      default:
+        throw new IllegalArgumentException(
+          "Cannot convert SQL type to HCatalog type " + sqlType);
+    }
+  }
+
   /**
    * Resolve a database-specific type to Avro data type.
    * @param sqlType     sql type
index ef1d363..5afd90c 100644 (file)
@@ -23,6 +23,7 @@ import java.sql.SQLException;
 import org.apache.avro.Schema;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -30,6 +31,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.config.ConfigurationHelper;
 import com.cloudera.sqoop.lib.LargeObjectLoader;
@@ -63,6 +65,13 @@ public class DataDrivenImportJob extends ImportJobBase {
   @Override
   protected void configureMapper(Job job, String tableName,
       String tableClassName) throws IOException {
+    if (isHCatJob) {
+      LOG.info("Configuring mapper for HCatalog import job");
+      job.setOutputKeyClass(LongWritable.class);
+      job.setOutputValueClass(SqoopHCatUtilities.getImportValueClass());
+      job.setMapperClass(SqoopHCatUtilities.getImportMapperClass());
+      return;
+    }
     if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
       // For text files, specify these as the output types; for
       // other types, we just use the defaults.
@@ -82,6 +91,9 @@ public class DataDrivenImportJob extends ImportJobBase {
 
   @Override
   protected Class<? extends Mapper> getMapperClass() {
+    if (options.getHCatTableName() != null) {
+      return SqoopHCatUtilities.getImportMapperClass();
+    }
     if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
       return TextImportMapper.class;
     } else if (options.getFileLayout()
@@ -98,6 +110,10 @@ public class DataDrivenImportJob extends ImportJobBase {
   @Override
   protected Class<? extends OutputFormat> getOutputFormatClass()
       throws ClassNotFoundException {
+    if (isHCatJob) {
+      LOG.debug("Returning HCatOutputFormat for output format");
+      return SqoopHCatUtilities.getOutputFormatClass();
+    }
     if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
       return RawKeyTextOutputFormat.class;
     } else if (options.getFileLayout()
index 1065d0b..d0be570 100644 (file)
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
 import org.apache.sqoop.util.LoggingUtils;
 import org.apache.sqoop.util.PerfCounters;
 import com.cloudera.sqoop.SqoopOptions;
@@ -57,7 +58,7 @@ public class ExportJobBase extends JobBase {
    * The (inferred) type of a file or group of files.
    */
   public enum FileType {
-    SEQUENCE_FILE, AVRO_DATA_FILE, UNKNOWN
+    SEQUENCE_FILE, AVRO_DATA_FILE, HCATALOG_MANAGED_FILE, UNKNOWN
   }
 
   public static final Log LOG = LogFactory.getLog(
@@ -80,6 +81,7 @@ public class ExportJobBase extends JobBase {
 
   protected ExportJobContext context;
 
+
   public ExportJobBase(final ExportJobContext ctxt) {
     this(ctxt, null, null, null);
   }
@@ -195,6 +197,9 @@ public class ExportJobBase extends JobBase {
    * @return the Path to the files we are going to export to the db.
    */
   protected Path getInputPath() throws IOException {
+    if (isHCatJob) {
+      return null;
+    }
     Path inputPath = new Path(context.getOptions().getExportDir());
     Configuration conf = options.getConf();
     inputPath = inputPath.makeQualified(FileSystem.get(conf));
@@ -207,7 +212,9 @@ public class ExportJobBase extends JobBase {
       throws ClassNotFoundException, IOException {
 
     super.configureInputFormat(job, tableName, tableClassName, splitByCol);
-    FileInputFormat.addInputPath(job, getInputPath());
+    if (!isHCatJob) {
+      FileInputFormat.addInputPath(job, getInputPath());
+    }
   }
 
   @Override
@@ -371,6 +378,12 @@ public class ExportJobBase extends JobBase {
       }
 
       propagateOptionsToJob(job);
+      if (isHCatJob) {
+        LOG.info("Configuring HCatalog for export job");
+        SqoopHCatUtilities hCatUtils = SqoopHCatUtilities.instance();
+        hCatUtils.configureHCat(options, job, cmgr, tableName,
+          job.getConfiguration());
+      }
       configureInputFormat(job, tableName, tableClassName, null);
       configureOutputFormat(job, tableName, tableClassName);
       configureMapper(job, tableName, tableClassName);
@@ -448,6 +461,9 @@ public class ExportJobBase extends JobBase {
   }
 
   protected FileType getInputFileType() {
+    if (isHCatJob) {
+      return FileType.HCATALOG_MANAGED_FILE;
+    }
     try {
       return getFileType(context.getOptions().getConf(), getInputPath());
     } catch (IOException ioe) {
index 2465f3f..ab7f21e 100644 (file)
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
 import org.apache.sqoop.util.PerfCounters;
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.config.ConfigurationHelper;
@@ -92,6 +93,13 @@ public class ImportJobBase extends JobBase {
 
     job.setOutputFormatClass(getOutputFormatClass());
 
+    if (isHCatJob) {
+      LOG.debug("Configuring output format for HCatalog  import job");
+      SqoopHCatUtilities.configureImportOutputFormat(options, job,
+        getContext().getConnManager(), tableName, job.getConfiguration());
+      return;
+    }
+
     if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
       job.getConfiguration().set("mapred.output.value.class", tableClassName);
     }
@@ -149,6 +157,11 @@ public class ImportJobBase extends JobBase {
     perfCounters.startClock();
 
     boolean success = doSubmitJob(job);
+
+    if (isHCatJob) {
+      SqoopHCatUtilities.instance().invokeOutputCommitterForLocalMode(job);
+    }
+
     perfCounters.stopClock();
 
     Counters jobCounters = job.getCounters();
index 20636a0..fee78e0 100644 (file)
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
 import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.manager.ExportJobContext;
 import com.cloudera.sqoop.mapreduce.ExportJobBase;
@@ -65,7 +66,11 @@ public class JdbcExportJob extends ExportJobBase {
 
     super.configureInputFormat(job, tableName, tableClassName, splitByCol);
 
-    if (fileType == FileType.AVRO_DATA_FILE) {
+    if (isHCatJob) {
+      SqoopHCatUtilities.configureExportInputFormat(options, job,
+        context.getConnManager(), tableName, job.getConfiguration());
+      return;
+    } else if (fileType == FileType.AVRO_DATA_FILE) {
       LOG.debug("Configuring for Avro export");
       ConnManager connManager = context.getConnManager();
       Map<String, Integer> columnTypeInts;
@@ -93,6 +98,9 @@ public class JdbcExportJob extends ExportJobBase {
   @Override
   protected Class<? extends InputFormat> getInputFormatClass()
       throws ClassNotFoundException {
+    if (isHCatJob) {
+      return SqoopHCatUtilities.getInputFormatClass();
+    }
     if (fileType == FileType.AVRO_DATA_FILE) {
       return AvroInputFormat.class;
     }
@@ -101,6 +109,9 @@ public class JdbcExportJob extends ExportJobBase {
 
   @Override
   protected Class<? extends Mapper> getMapperClass() {
+    if (isHCatJob) {
+      return SqoopHCatUtilities.getExportMapperClass();
+    }
     switch (fileType) {
       case SEQUENCE_FILE:
         return SequenceFileExportMapper.class;
index 0df1156..322df1c 100644 (file)
@@ -56,6 +56,7 @@ public class JobBase {
   private Job mrJob;
 
   private ClassLoader prevClassLoader = null;
+  protected final boolean isHCatJob;
 
   public static final String PROPERTY_VERBOSE = "sqoop.verbose";
 
@@ -76,6 +77,7 @@ public class JobBase {
     this.mapperClass = mapperClass;
     this.inputFormatClass = inputFormatClass;
     this.outputFormatClass = outputFormatClass;
+    isHCatJob = options.getHCatTableName() != null;
   }
 
   /**
@@ -220,7 +222,7 @@ public class JobBase {
    */
   protected void loadJars(Configuration conf, String ormJarFile,
       String tableClassName) throws IOException {
+
     boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address"))
         || "local".equals(conf.get("mapred.job.tracker"));
     if (isLocal) {
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportFormat.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportFormat.java
new file mode 100644 (file)
index 0000000..47febf7
--- /dev/null
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.sqoop.mapreduce.ExportInputFormat;
+
+/**
+ * A combined HCatInputFormat equivalent that allows us to generate the number
+ * of splits to the number of map tasks.
+ *
+ * The logic is simple. We get the list of splits for HCatInputFormat. If it is
+ * less than the number of mappers, all is good. Else, we sort the splits by
+ * size and assign them to each of the mappers in a simple scheme. After
+ * assigning the splits to each of the mapper, for the next round we start with
+ * the mapper that got the last split. That way, the size of the split is
+ * distributed in a more uniform fashion than a simple round-robin assignment.
+ */
+public class SqoopHCatExportFormat extends HCatInputFormat {
+  public static final Log LOG = LogFactory
+    .getLog(SqoopHCatExportFormat.class.getName());
+
+  @Override
+  public List<InputSplit> getSplits(JobContext job)
+    throws IOException, InterruptedException {
+    List<InputSplit> hCatSplits = super.getSplits(job);
+    int hCatSplitCount = hCatSplits.size();
+    int expectedSplitCount = ExportInputFormat.getNumMapTasks(job);
+    if (expectedSplitCount == 0) {
+      expectedSplitCount = hCatSplitCount;
+    }
+    LOG.debug("Expected split count " + expectedSplitCount);
+    LOG.debug("HCatInputFormat provided split count " + hCatSplitCount);
+    // Sort the splits by length descending.
+
+    Collections.sort(hCatSplits, new Comparator<InputSplit>() {
+      @Override
+      public int compare(InputSplit is1, InputSplit is2) {
+        try {
+          return (int) (is2.getLength() - is1.getLength());
+        } catch (Exception e) {
+          LOG.warn("Exception caught while sorting Input splits " + e);
+        }
+        return 0;
+      }
+    });
+    List<InputSplit> combinedSplits = new ArrayList<InputSplit>();
+
+    // The number of splits generated by HCatInputFormat is within
+    // our limits
+
+    if (hCatSplitCount <= expectedSplitCount) {
+      for (InputSplit split : hCatSplits) {
+        List<InputSplit> hcSplitList = new ArrayList<InputSplit>();
+        hcSplitList.add(split);
+        combinedSplits.add(new SqoopHCatInputSplit(hcSplitList));
+      }
+      return combinedSplits;
+    }
+    List<List<InputSplit>> combinedSplitList =
+      new ArrayList<List<InputSplit>>();
+    for (int i = 0; i < expectedSplitCount; i++) {
+      combinedSplitList.add(new ArrayList<InputSplit>());
+    }
+    boolean ascendingAssigment = true;
+
+    int lastSet = 0;
+    for (int i = 0; i < hCatSplitCount; ++i) {
+      int splitNum = i % expectedSplitCount;
+      int currentSet = i / expectedSplitCount;
+      if (currentSet != lastSet) {
+        ascendingAssigment = !ascendingAssigment;
+      }
+      if (ascendingAssigment) {
+        combinedSplitList.get(splitNum).add(hCatSplits.get(i));
+      } else {
+        combinedSplitList.
+          get(expectedSplitCount - 1 - splitNum).add(hCatSplits.get(i));
+      }
+      lastSet = currentSet;
+    }
+    for (int i = 0; i < expectedSplitCount; i++) {
+      SqoopHCatInputSplit sqoopSplit =
+        new SqoopHCatInputSplit(combinedSplitList.get(i));
+      combinedSplits.add(sqoopSplit);
+    }
+
+    return combinedSplits;
+
+  }
+
+  @Override
+  public RecordReader<WritableComparable, HCatRecord>
+    createRecordReader(InputSplit split,
+      TaskAttemptContext taskContext)
+      throws IOException, InterruptedException {
+    LOG.debug("Creating a SqoopHCatRecordReader");
+    return new SqoopHCatRecordReader(split, taskContext, this);
+  }
+
+  public RecordReader<WritableComparable, HCatRecord>
+    createHCatRecordReader(InputSplit split,
+      TaskAttemptContext taskContext)
+      throws IOException, InterruptedException {
+    LOG.debug("Creating a base HCatRecordReader");
+    return super.createRecordReader(split, taskContext);
+  }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatExportMapper.java
new file mode 100644 (file)
index 0000000..539cedf
--- /dev/null
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.mapreduce.AutoProgressMapper;
+import org.apache.sqoop.mapreduce.ExportJobBase;
+
+/**
+ * A mapper that works on combined hcat splits.
+ */
+public class SqoopHCatExportMapper
+    extends
+  AutoProgressMapper<WritableComparable, HCatRecord,
+  SqoopRecord, WritableComparable> {
+  public static final Log LOG = LogFactory
+    .getLog(SqoopHCatExportMapper.class.getName());
+  private InputJobInfo jobInfo;
+  private HCatSchema hCatFullTableSchema;
+  private List<HCatFieldSchema> hCatSchemaFields;
+
+  private SqoopRecord sqoopRecord;
+  private static final String TIMESTAMP_TYPE = "java.sql.Timestamp";
+  private static final String TIME_TYPE = "java.sql.Time";
+  private static final String DATE_TYPE = "java.sql.Date";
+  private static final String BIG_DECIMAL_TYPE = "java.math.BigDecimal";
+  private static final String FLOAT_TYPE = "Float";
+  private static final String DOUBLE_TYPE = "Double";
+  private static final String BYTE_TYPE = "Byte";
+  private static final String SHORT_TYPE = "Short";
+  private static final String INTEGER_TYPE = "Integer";
+  private static final String LONG_TYPE = "Long";
+  private static final String BOOLEAN_TYPE = "Boolean";
+  private static final String STRING_TYPE = "String";
+  private static final String BYTESWRITABLE =
+    "org.apache.hadoop.io.BytesWritable";
+  private static boolean debugHCatExportMapper = false;
+  private MapWritable colTypesJava;
+  private MapWritable colTypesSql;
+
+  @Override
+  protected void setup(Context context)
+    throws IOException, InterruptedException {
+    super.setup(context);
+
+    Configuration conf = context.getConfiguration();
+
+    colTypesJava = DefaultStringifier.load(conf,
+      SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_JAVA, MapWritable.class);
+    colTypesSql = DefaultStringifier.load(conf,
+      SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_SQL, MapWritable.class);
+    // Instantiate a copy of the user's class to hold and parse the record.
+
+    String recordClassName = conf.get(
+      ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
+    if (null == recordClassName) {
+      throw new IOException("Export table class name ("
+        + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+        + ") is not set!");
+    }
+    debugHCatExportMapper = conf.getBoolean(
+      SqoopHCatUtilities.DEBUG_HCAT_EXPORT_MAPPER_PROP, false);
+    try {
+      Class cls = Class.forName(recordClassName, true,
+        Thread.currentThread().getContextClassLoader());
+      sqoopRecord = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException(cnfe);
+    }
+
+    if (null == sqoopRecord) {
+      throw new IOException("Could not instantiate object of type "
+        + recordClassName);
+    }
+
+    String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+    jobInfo =
+      (InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
+    HCatSchema tableSchema = jobInfo.getTableInfo().getDataColumns();
+    HCatSchema partitionSchema =
+      jobInfo.getTableInfo().getPartitionColumns();
+    hCatFullTableSchema = new HCatSchema(tableSchema.getFields());
+    for (HCatFieldSchema hfs : partitionSchema.getFields()) {
+      hCatFullTableSchema.append(hfs);
+    }
+    hCatSchemaFields = hCatFullTableSchema.getFields();
+
+  }
+
+  @Override
+  public void map(WritableComparable key, HCatRecord value,
+    Context context)
+    throws IOException, InterruptedException {
+    context.write(convertToSqoopRecord(value), NullWritable.get());
+  }
+
+  private SqoopRecord convertToSqoopRecord(HCatRecord hcr)
+    throws IOException {
+    Text key = new Text();
+    for (Map.Entry<String, Object> e : sqoopRecord.getFieldMap().entrySet()) {
+      String colName = e.getKey();
+      String hfn = colName.toLowerCase();
+      key.set(hfn);
+      String javaColType = colTypesJava.get(key).toString();
+      int sqlType = ((IntWritable) colTypesSql.get(key)).get();
+      HCatFieldSchema field =
+        hCatFullTableSchema.get(hfn);
+      HCatFieldSchema.Type fieldType = field.getType();
+      Object hCatVal =
+        hcr.get(hfn, hCatFullTableSchema);
+      String hCatTypeString = field.getTypeString();
+      Object sqlVal = convertToSqoop(hCatVal, fieldType,
+        javaColType, hCatTypeString);
+      if (debugHCatExportMapper) {
+        LOG.debug("hCatVal " + hCatVal + " of type "
+          + (hCatVal == null ? null : hCatVal.getClass().getName())
+          + ",sqlVal " + sqlVal + " of type "
+          + (sqlVal == null ? null : sqlVal.getClass().getName())
+          + ",java type " + javaColType + ", sql type = "
+          + SqoopHCatUtilities.sqlTypeString(sqlType));
+      }
+      sqoopRecord.setField(colName, sqlVal);
+    }
+    return sqoopRecord;
+  }
+
+  private Object convertToSqoop(Object val,
+    HCatFieldSchema.Type fieldType, String javaColType,
+    String hCatTypeString) throws IOException {
+
+    if (val == null) {
+      return null;
+    }
+
+    switch (fieldType) {
+      case INT:
+      case TINYINT:
+      case SMALLINT:
+      case FLOAT:
+      case DOUBLE:
+        val = convertNumberTypes(val, javaColType);
+        if (val != null) {
+          return val;
+        }
+        break;
+      case BOOLEAN:
+        val = convertBooleanTypes(val, javaColType);
+        if (val != null) {
+          return val;
+        }
+        break;
+      case BIGINT:
+        if (javaColType.equals(DATE_TYPE)) {
+          return new Date((Long) val);
+        } else if (javaColType.equals(TIME_TYPE)) {
+          return new Time((Long) val);
+        } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+          return new Timestamp((Long) val);
+        } else {
+          val = convertNumberTypes(val, javaColType);
+          if (val != null) {
+            return val;
+          }
+        }
+        break;
+      case STRING:
+        val = convertStringTypes(val, javaColType);
+        if (val != null) {
+          return val;
+        }
+        break;
+      case BINARY:
+        val = convertBinaryTypes(val, javaColType);
+        if (val != null) {
+          return val;
+        }
+        break;
+      case ARRAY:
+      case MAP:
+      case STRUCT:
+      default:
+        throw new IOException("Cannot convert HCatalog type "
+          + fieldType);
+    }
+    LOG.error("Cannot convert HCatalog object of "
+      + " type " + hCatTypeString + " to java object type "
+      + javaColType);
+    return null;
+  }
+
+  private Object convertBinaryTypes(Object val, String javaColType) {
+    byte[] bb = (byte[]) val;
+    if (javaColType.equals(BYTESWRITABLE)) {
+      BytesWritable bw = new BytesWritable();
+      bw.set(bb, 0, bb.length);
+      return bw;
+    }
+    return null;
+  }
+
+  private Object convertStringTypes(Object val, String javaColType) {
+    String valStr = val.toString();
+    if (javaColType.equals(BIG_DECIMAL_TYPE)) {
+      return new BigDecimal(valStr);
+    } else if (javaColType.equals(DATE_TYPE)
+      || javaColType.equals(TIME_TYPE)
+      || javaColType.equals(TIMESTAMP_TYPE)) {
+      // Oracle expects timestamps for Date also by default based on version
+      // Just allow all date types to be assignment compatible
+      if (valStr.length() == 10) { // Date in yyyy-mm-dd format
+        Date d = Date.valueOf(valStr);
+        if (javaColType.equals(DATE_TYPE)) {
+          return d;
+        } else if (javaColType.equals(TIME_TYPE)) {
+          return new Time(d.getTime());
+        } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+          return new Timestamp(d.getTime());
+        }
+      } else if (valStr.length() == 8) { // time in hh:mm:ss
+        Time t = Time.valueOf(valStr);
+        if (javaColType.equals(DATE_TYPE)) {
+          return new Date(t.getTime());
+        } else if (javaColType.equals(TIME_TYPE)) {
+          return t;
+        } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+          return new Timestamp(t.getTime());
+        }
+      } else if (valStr.length() == 19) { // timestamp in yyyy-mm-dd hh:ss:mm
+        Timestamp ts = Timestamp.valueOf(valStr);
+        if (javaColType.equals(DATE_TYPE)) {
+          return new Date(ts.getTime());
+        } else if (javaColType.equals(TIME_TYPE)) {
+          return new Time(ts.getTime());
+        } else if (javaColType.equals(TIMESTAMP_TYPE)) {
+          return ts;
+        }
+      } else {
+        return null;
+      }
+    } else if (javaColType.equals(STRING_TYPE)) {
+      return valStr;
+    } else if (javaColType.equals(BOOLEAN_TYPE)) {
+      return Boolean.valueOf(valStr);
+    } else if (javaColType.equals(BYTE_TYPE)) {
+      return Byte.parseByte(valStr);
+    } else if (javaColType.equals(SHORT_TYPE)) {
+      return Short.parseShort(valStr);
+    } else if (javaColType.equals(INTEGER_TYPE)) {
+      return Integer.parseInt(valStr);
+    } else if (javaColType.equals(LONG_TYPE)) {
+      return Long.parseLong(valStr);
+    } else if (javaColType.equals(FLOAT_TYPE)) {
+      return Float.parseFloat(valStr);
+    } else if (javaColType.equals(DOUBLE_TYPE)) {
+      return Double.parseDouble(valStr);
+    }
+    return null;
+  }
+
+  private Object convertBooleanTypes(Object val, String javaColType) {
+    Boolean b = (Boolean) val;
+    if (javaColType.equals(BOOLEAN_TYPE)) {
+      return b;
+    } else if (javaColType.equals(BYTE_TYPE)) {
+      return (byte) (b ? 1 : 0);
+    } else if (javaColType.equals(SHORT_TYPE)) {
+      return (short) (b ? 1 : 0);
+    } else if (javaColType.equals(INTEGER_TYPE)) {
+      return (int) (b ? 1 : 0);
+    } else if (javaColType.equals(LONG_TYPE)) {
+      return (long) (b ? 1 : 0);
+    } else if (javaColType.equals(FLOAT_TYPE)) {
+      return (float) (b ? 1 : 0);
+    } else if (javaColType.equals(DOUBLE_TYPE)) {
+      return (double) (b ? 1 : 0);
+    } else if (javaColType.equals(BIG_DECIMAL_TYPE)) {
+      return new BigDecimal(b ? 1 : 0);
+    } else if (javaColType.equals(STRING_TYPE)) {
+      return val.toString();
+    }
+    return null;
+  }
+
+  private Object convertNumberTypes(Object val, String javaColType) {
+    Number n = (Number) val;
+    if (javaColType.equals(BYTE_TYPE)) {
+      return n.byteValue();
+    } else if (javaColType.equals(SHORT_TYPE)) {
+      return n.shortValue();
+    } else if (javaColType.equals(INTEGER_TYPE)) {
+      return n.intValue();
+    } else if (javaColType.equals(LONG_TYPE)) {
+      return n.longValue();
+    } else if (javaColType.equals(FLOAT_TYPE)) {
+      return n.floatValue();
+    } else if (javaColType.equals(DOUBLE_TYPE)) {
+      return n.doubleValue();
+    } else if (javaColType.equals(BIG_DECIMAL_TYPE)) {
+      return new BigDecimal(n.doubleValue());
+    } else if (javaColType.equals(BOOLEAN_TYPE)) {
+      return n.byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
+    } else if (javaColType.equals(STRING_TYPE)) {
+      return n.toString();
+    }
+    return null;
+  }
+
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportMapper.java
new file mode 100644 (file)
index 0000000..4f0ff1b
--- /dev/null
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.StorerInfo;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.mapreduce.ImportJobBase;
+import org.apache.sqoop.mapreduce.SqoopMapper;
+
+import com.cloudera.sqoop.lib.BlobRef;
+import com.cloudera.sqoop.lib.ClobRef;
+import com.cloudera.sqoop.lib.DelimiterSet;
+import com.cloudera.sqoop.lib.FieldFormatter;
+import com.cloudera.sqoop.lib.LargeObjectLoader;
+
+/**
+ * A mapper for HCatalog import.
+ */
+public class SqoopHCatImportMapper extends
+  SqoopMapper<WritableComparable, SqoopRecord,
+  WritableComparable, HCatRecord> {
+  public static final Log LOG = LogFactory
+    .getLog(SqoopHCatImportMapper.class.getName());
+
+  private static boolean debugHCatImportMapper = false;
+
+  private InputJobInfo jobInfo;
+  private HCatSchema hCatFullTableSchema;
+  private int fieldCount;
+  private boolean bigDecimalFormatString;
+  private LargeObjectLoader lobLoader;
+  private HCatSchema partitionSchema = null;
+  private HCatSchema dataColsSchema = null;
+  private String stringDelimiterReplacements = null;
+  private ArrayWritable delimCharsArray;
+  private String hiveDelimsReplacement;
+  private boolean doHiveDelimsReplacement = false;
+  private DelimiterSet hiveDelimiters;
+  private String staticPartitionKey;
+  private int[] hCatFieldPositions;
+  private int colCount;
+
+  @Override
+  protected void setup(Context context)
+    throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+    jobInfo =
+      (InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
+    dataColsSchema = jobInfo.getTableInfo().getDataColumns();
+    partitionSchema =
+      jobInfo.getTableInfo().getPartitionColumns();
+    StringBuilder storerInfoStr = new StringBuilder(1024);
+    StorerInfo storerInfo = jobInfo.getTableInfo().getStorerInfo();
+    storerInfoStr.append("HCatalog Storer Info : ")
+      .append("\n\tHandler = ").append(storerInfo.getStorageHandlerClass())
+      .append("\n\tInput format class = ").append(storerInfo.getIfClass())
+      .append("\n\tOutput format class = ").append(storerInfo.getOfClass())
+      .append("\n\tSerde class = ").append(storerInfo.getSerdeClass());
+    Properties storerProperties = storerInfo.getProperties();
+    if (!storerProperties.isEmpty()) {
+      storerInfoStr.append("\nStorer properties ");
+      for (Map.Entry<Object, Object> entry : storerProperties.entrySet()) {
+        String key = (String) entry.getKey();
+        Object val = entry.getValue();
+        storerInfoStr.append("\n\t").append(key).append('=').append(val);
+      }
+    }
+    storerInfoStr.append("\n");
+    LOG.info(storerInfoStr);
+
+    hCatFullTableSchema = new HCatSchema(dataColsSchema.getFields());
+    for (HCatFieldSchema hfs : partitionSchema.getFields()) {
+      hCatFullTableSchema.append(hfs);
+    }
+    fieldCount = hCatFullTableSchema.size();
+    lobLoader = new LargeObjectLoader(conf,
+      new Path(jobInfo.getTableInfo().getTableLocation()));
+    bigDecimalFormatString = conf.getBoolean(
+      ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
+      ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
+    debugHCatImportMapper = conf.getBoolean(
+      SqoopHCatUtilities.DEBUG_HCAT_IMPORT_MAPPER_PROP, false);
+    IntWritable[] delimChars = DefaultStringifier.loadArray(conf,
+        SqoopHCatUtilities.HIVE_DELIMITERS_TO_REPLACE_PROP, IntWritable.class);
+    hiveDelimiters = new DelimiterSet(
+      (char) delimChars[0].get(), (char) delimChars[1].get(),
+      (char) delimChars[2].get(), (char) delimChars[3].get(),
+      delimChars[4].get() == 1 ? true : false);
+    hiveDelimsReplacement =
+      conf.get(SqoopHCatUtilities.HIVE_DELIMITERS_REPLACEMENT_PROP);
+    if (hiveDelimsReplacement == null) {
+      hiveDelimsReplacement = "";
+    }
+    doHiveDelimsReplacement = Boolean.valueOf(conf.get(
+      SqoopHCatUtilities.HIVE_DELIMITERS_REPLACEMENT_ENABLED_PROP));
+
+    IntWritable[] fPos = DefaultStringifier.loadArray(conf,
+        SqoopHCatUtilities.HCAT_FIELD_POSITIONS_PROP, IntWritable.class);
+    hCatFieldPositions = new int[fPos.length];
+    for (int i = 0; i < fPos.length; ++i) {
+      hCatFieldPositions[i] = fPos[i].get();
+    }
+
+    LOG.debug("Hive delims replacement enabled : " + doHiveDelimsReplacement);
+    LOG.debug("Hive Delimiters : " + hiveDelimiters.toString());
+    LOG.debug("Hive delimiters replacement : " + hiveDelimsReplacement);
+    staticPartitionKey =
+      conf.get(SqoopHCatUtilities.HCAT_STATIC_PARTITION_KEY_PROP);
+    LOG.debug("Static partition key used : " + staticPartitionKey);
+
+
+  }
+
+  @Override
+  public void map(WritableComparable key, SqoopRecord value,
+    Context context)
+    throws IOException, InterruptedException {
+
+    try {
+      // Loading of LOBs was delayed until we have a Context.
+      value.loadLargeObjects(lobLoader);
+    } catch (SQLException sqlE) {
+      throw new IOException(sqlE);
+    }
+    if (colCount == -1) {
+      colCount = value.getFieldMap().size();
+    }
+    context.write(key, convertToHCatRecord(value));
+  }
+
+  @Override
+  protected void cleanup(Context context) throws IOException {
+    if (null != lobLoader) {
+      lobLoader.close();
+    }
+  }
+
+  private HCatRecord convertToHCatRecord(SqoopRecord sqr)
+    throws IOException {
+    Map<String, Object> fieldMap = sqr.getFieldMap();
+    HCatRecord result = new DefaultHCatRecord(fieldCount);
+
+    for (Map.Entry<String, Object> entry : fieldMap.entrySet()) {
+      String key = entry.getKey();
+      Object val = entry.getValue();
+      String hfn = key.toLowerCase();
+      if (staticPartitionKey != null && staticPartitionKey.equals(hfn)) {
+        continue;
+      }
+      HCatFieldSchema hfs = hCatFullTableSchema.get(hfn);
+      if (debugHCatImportMapper) {
+        LOG.debug("SqoopRecordVal: field = " + key + " Val " + val
+          + " of type " + (val == null ? null : val.getClass().getName())
+          + ", hcattype " + hfs.getTypeString());
+      }
+      Object hCatVal = toHCat(val, hfs.getType(), hfs.getTypeString());
+
+      result.set(hfn, hCatFullTableSchema, hCatVal);
+    }
+
+    return result;
+  }
+
+
+  private Object toHCat(Object val, HCatFieldSchema.Type hfsType,
+    String hCatTypeString) {
+
+    if (val == null) {
+      return null;
+    }
+
+    Object retVal = null;
+
+    if (val instanceof Number) {
+      retVal = convertNumberTypes(val, hfsType);
+    } else if (val instanceof Boolean) {
+      retVal = convertBooleanTypes(val, hfsType);
+    } else if (val instanceof String) {
+      if (hfsType == HCatFieldSchema.Type.STRING) {
+        String str = (String) val;
+        if (doHiveDelimsReplacement) {
+          retVal = FieldFormatter
+            .hiveStringReplaceDelims(str, hiveDelimsReplacement,
+                hiveDelimiters);
+        } else {
+          retVal = str;
+        }
+      }
+    } else if (val instanceof java.util.Date) {
+      retVal = converDateTypes(val, hfsType);
+    } else if (val instanceof BytesWritable) {
+      if (hfsType == HCatFieldSchema.Type.BINARY) {
+        BytesWritable bw = (BytesWritable) val;
+        retVal = bw.getBytes();
+      }
+    } else if (val instanceof BlobRef) {
+      if (hfsType == HCatFieldSchema.Type.BINARY) {
+        BlobRef br = (BlobRef) val;
+        byte[] bytes = br.isExternal() ? br.toString().getBytes()
+          : br.getData();
+        retVal = bytes;
+      }
+    } else if (val instanceof ClobRef) {
+      if (hfsType == HCatFieldSchema.Type.STRING) {
+        ClobRef cr = (ClobRef) val;
+        String s = cr.isExternal() ? cr.toString() : cr.getData();
+        retVal = s;
+      }
+    } else {
+      throw new UnsupportedOperationException("Objects of type "
+        + val.getClass().getName() + " are not suported");
+    }
+    if (retVal == null) {
+      LOG.error("Objects of type "
+        + val.getClass().getName() + " can not be mapped to HCatalog type "
+        + hCatTypeString);
+    }
+    return retVal;
+  }
+
+  private Object converDateTypes(Object val,
+    HCatFieldSchema.Type hfsType) {
+    if (val instanceof java.sql.Date) {
+      if (hfsType == HCatFieldSchema.Type.BIGINT) {
+        return ((Date) val).getTime();
+      } else if (hfsType == HCatFieldSchema.Type.STRING) {
+        return val.toString();
+      }
+    } else if (val instanceof java.sql.Time) {
+      if (hfsType == HCatFieldSchema.Type.BIGINT) {
+        return ((Time) val).getTime();
+      } else if (hfsType == HCatFieldSchema.Type.STRING) {
+        return val.toString();
+      }
+    } else if (val instanceof java.sql.Timestamp) {
+      if (hfsType == HCatFieldSchema.Type.BIGINT) {
+        return ((Timestamp) val).getTime();
+      } else if (hfsType == HCatFieldSchema.Type.STRING) {
+        return val.toString();
+      }
+    }
+    return null;
+  }
+
+  private Object convertBooleanTypes(Object val,
+    HCatFieldSchema.Type hfsType) {
+    Boolean b = (Boolean) val;
+    if (hfsType == HCatFieldSchema.Type.BOOLEAN) {
+      return b;
+    } else if (hfsType == HCatFieldSchema.Type.TINYINT) {
+      return (byte) (b ? 1 : 0);
+    } else if (hfsType == HCatFieldSchema.Type.SMALLINT) {
+      return (short) (b ? 1 : 0);
+    } else if (hfsType == HCatFieldSchema.Type.INT) {
+      return (int) (b ? 1 : 0);
+    } else if (hfsType == HCatFieldSchema.Type.BIGINT) {
+      return (long) (b ? 1 : 0);
+    } else if (hfsType == HCatFieldSchema.Type.FLOAT) {
+      return (float) (b ? 1 : 0);
+    } else if (hfsType == HCatFieldSchema.Type.DOUBLE) {
+      return (double) (b ? 1 : 0);
+    } else if (hfsType == HCatFieldSchema.Type.STRING) {
+      return val.toString();
+    }
+    return null;
+  }
+
+  private Object convertNumberTypes(Object val,
+    HCatFieldSchema.Type hfsType) {
+    if (!(val instanceof Number)) {
+      return null;
+    }
+    if (val instanceof BigDecimal && hfsType == HCatFieldSchema.Type.STRING) {
+      BigDecimal bd = (BigDecimal) val;
+      if (bigDecimalFormatString) {
+        return bd.toPlainString();
+      } else {
+        return bd.toString();
+      }
+    }
+    Number n = (Number) val;
+    if (hfsType == HCatFieldSchema.Type.TINYINT) {
+      return n.byteValue();
+    } else if (hfsType == HCatFieldSchema.Type.SMALLINT) {
+      return n.shortValue();
+    } else if (hfsType == HCatFieldSchema.Type.INT) {
+      return n.intValue();
+    } else if (hfsType == HCatFieldSchema.Type.BIGINT) {
+      return n.longValue();
+    } else if (hfsType == HCatFieldSchema.Type.FLOAT) {
+      return n.floatValue();
+    } else if (hfsType == HCatFieldSchema.Type.DOUBLE) {
+      return n.doubleValue();
+    } else if (hfsType == HCatFieldSchema.Type.BOOLEAN) {
+      return n.byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
+    } else if (hfsType == HCatFieldSchema.Type.STRING) {
+      return n.toString();
+    }
+    return null;
+  }
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatInputSplit.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatInputSplit.java
new file mode 100644 (file)
index 0000000..5a2e48a
--- /dev/null
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hcatalog.mapreduce.HCatSplit;
+
+/**
+ * An abstraction of a combined HCatSplits.
+ *
+ */
+public class SqoopHCatInputSplit extends InputSplit implements Writable {
+  private List<HCatSplit> hCatSplits;
+  private String[] hCatLocations;
+  private long inputLength;
+
+  public SqoopHCatInputSplit() {
+  }
+
+  public SqoopHCatInputSplit(List<InputSplit> splits) {
+    hCatSplits = new ArrayList<HCatSplit>();
+    Set<String> locations = new HashSet<String>();
+    for (int i = 0; i < splits.size(); ++i) {
+      HCatSplit hsSplit = (HCatSplit) splits.get(i);
+      hCatSplits.add(hsSplit);
+      this.inputLength += hsSplit.getLength();
+      locations.addAll(Arrays.asList(hsSplit.getLocations()));
+    }
+    this.hCatLocations = locations.toArray(new String[0]);
+  }
+
+  public int length() {
+    return this.hCatSplits.size();
+  }
+
+  public HCatSplit get(int index) {
+    return this.hCatSplits.get(index);
+  }
+
+  @Override
+  public long getLength() throws IOException, InterruptedException {
+    if (this.inputLength == 0L) {
+      for (HCatSplit split : this.hCatSplits) {
+        this.inputLength += split.getLength();
+      }
+    }
+    return this.inputLength;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException, InterruptedException {
+    if (this.hCatLocations == null) {
+      Set<String> locations = new HashSet<String>();
+      for (HCatSplit split : this.hCatSplits) {
+        locations.addAll(Arrays.asList(split.getLocations()));
+      }
+      this.hCatLocations = locations.toArray(new String[0]);
+    }
+    return this.hCatLocations;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(this.inputLength);
+    out.writeInt(this.hCatSplits.size());
+    for (HCatSplit split : this.hCatSplits) {
+      split.write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.inputLength = in.readLong();
+    int size = in.readInt();
+    this.hCatSplits = new ArrayList<HCatSplit>(size);
+    for (int i = 0; i < size; ++i) {
+      HCatSplit hs = new HCatSplit();
+      hs.readFields(in);
+      hCatSplits.add(hs);
+    }
+  }
+}
+
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatRecordReader.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatRecordReader.java
new file mode 100644 (file)
index 0000000..55604f7
--- /dev/null
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+
+/**
+ * A Record Reader that can combine underlying splits.
+ */
+public class SqoopHCatRecordReader extends
+  RecordReader<WritableComparable, HCatRecord> {
+  private final SqoopHCatExportFormat hCatExportFormat;
+  private SqoopHCatInputSplit hCatSplit;
+  private TaskAttemptContext context;
+  private int subIndex;
+  private long progress;
+
+  private RecordReader<WritableComparable, HCatRecord> curReader;
+
+  public static final Log LOG = LogFactory
+    .getLog(SqoopHCatRecordReader.class.getName());
+
+  public SqoopHCatRecordReader(final InputSplit split,
+    final TaskAttemptContext context, final SqoopHCatExportFormat inputFormat)
+    throws IOException {
+    this.hCatSplit = (SqoopHCatInputSplit) split;
+    this.context = context;
+    this.subIndex = 0;
+    this.curReader = null;
+    this.progress = 0L;
+    this.hCatExportFormat = inputFormat;
+
+    initNextRecordReader();
+  }
+
+  @Override
+  public void initialize(final InputSplit split,
+    final TaskAttemptContext ctxt)
+    throws IOException, InterruptedException {
+    this.hCatSplit = (SqoopHCatInputSplit) split;
+    this.context = ctxt;
+
+    if (null != this.curReader) {
+      this.curReader.initialize(((SqoopHCatInputSplit) split)
+        .get(0), context);
+    }
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    while (this.curReader == null || !this.curReader.nextKeyValue()) {
+      if (!initNextRecordReader()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public WritableComparable getCurrentKey() throws IOException,
+    InterruptedException {
+    return this.curReader.getCurrentKey();
+  }
+
+  @Override
+  public HCatRecord getCurrentValue() throws IOException, InterruptedException {
+    return this.curReader.getCurrentValue();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (this.curReader != null) {
+      this.curReader.close();
+      this.curReader = null;
+    }
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    long subprogress = 0L;
+    if (null != this.curReader) {
+      subprogress = (long) (this.curReader.getProgress()
+        * this.hCatSplit.get(this.subIndex - 1).getLength());
+    }
+    // Indicate the total processed count.
+    return Math.min(1.0F, (this.progress + subprogress)
+      / (float) this.hCatSplit.getLength());
+  }
+
+  protected boolean initNextRecordReader() throws IOException {
+    if (this.curReader != null) {
+      // close current record reader if open
+      this.curReader.close();
+      this.curReader = null;
+      if (this.subIndex > 0) {
+        this.progress +=
+          this.hCatSplit.get(this.subIndex - 1).getLength();
+      }
+      LOG.debug("Closed current reader.  Current progress = " + progress);
+    }
+
+    if (this.subIndex == this.hCatSplit.length()) {
+      LOG.debug("Done with all splits");
+      return false;
+    }
+
+    try {
+      // get a record reader for the subsplit-index chunk
+
+      this.curReader = this.hCatExportFormat.createHCatRecordReader(
+        this.hCatSplit.get(this.subIndex), this.context);
+
+      LOG.debug("Created a HCatRecordReader for split " + subIndex);
+      // initialize() for the first RecordReader will be called by MapTask;
+      // we're responsible for initializing subsequent RecordReaders.
+      if (this.subIndex > 0) {
+        this.curReader.initialize(this.hCatSplit.get(this.subIndex),
+          this.context);
+        LOG.info("Initialized reader with current split");
+      }
+    } catch (Exception e) {
+      throw new IOException("Error initializing HCat record reader", e);
+    }
+    LOG.debug("Created record reader for subsplit " + subIndex);
+    ++this.subIndex;
+    return true;
+  }
+}
+
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatUtilities.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatUtilities.java
new file mode 100644 (file)
index 0000000..a109b40
--- /dev/null
@@ -0,0 +1,1215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.hcat;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.shims.HCatHadoopShims;
+import org.apache.sqoop.config.ConfigurationConstants;
+import org.apache.sqoop.hive.HiveTypes;
+import org.apache.sqoop.manager.ConnManager;
+import org.apache.sqoop.util.Executor;
+import org.apache.sqoop.util.LoggingAsyncSink;
+import org.apache.sqoop.util.SubprocessSecurityManager;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.lib.DelimiterSet;
+import com.cloudera.sqoop.util.ExitSecurityException;
+
+/**
+ * Utility methods for the HCatalog support for Sqoop.
+ */
+public final class SqoopHCatUtilities {
+  public static final String DEFHCATDB = "default";
+  public static final String HIVESITEXMLPATH = "/conf/hive-site.xml";
+  public static final String HCATSHAREDIR = "share/hcatalog";
+  public static final String DEFLIBDIR = "lib";
+  public static final String TEXT_FORMAT_IF_CLASS =
+    "org.apache.hadoop.mapred.TextInputFormat";
+  public static final String TEXT_FORMAT_OF_CLASS =
+    "org.apache.hadoop.mapred.TextOutputFormat";
+  public static final String TEXT_FORMAT_SERDE_CLASS =
+    "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
+  public static final String HCAT_DB_OUTPUT_COLTYPES_JAVA =
+    "sqoop.hcat.db.output.coltypes.java";
+  public static final String HCAT_DB_OUTPUT_COLTYPES_SQL =
+    "sqoop.hcat.db.output.coltypes.sql";
+  public static final String HCAT_CLI_MAIN_CLASS =
+    "org.apache.hcatalog.cli.HCatCli";
+  public static final String HCAT_DEF_STORAGE_STANZA = "stored as rcfile";
+  public static final String HIVE_DELIMITERS_TO_REPLACE_PROP =
+    "sqoop.hive.delims.to.replace";
+  public static final String HIVE_DELIMITERS_REPLACEMENT_PROP =
+    "sqoop.hive.delims.replacement";
+  public static final String HIVE_DELIMITERS_REPLACEMENT_ENABLED_PROP =
+    "sqoop.hive.delims.replacement.enabled";
+  public static final String HCAT_STATIC_PARTITION_KEY_PROP =
+    "sqoop.hcat.partition.key";
+  public static final String HCAT_FIELD_POSITIONS_PROP =
+    "sqoop.hcat.field.positions";
+  public static final String DEBUG_HCAT_IMPORT_MAPPER_PROP =
+    "sqoop.hcat.debug.import.mapper";
+  public static final String DEBUG_HCAT_EXPORT_MAPPER_PROP =
+    "sqoop.hcat.debug.export.mapper";
+  private static final String HCATCMD = Shell.WINDOWS ? "hcat.cmd" : "hcat";
+  private SqoopOptions options;
+  private ConnManager connManager;
+  private String hCatTableName;
+  private String hCatDatabaseName;
+  private Configuration configuration;
+  private Job hCatJob;
+  private HCatSchema hCatOutputSchema;
+  private HCatSchema hCatPartitionSchema;
+  private HCatSchema projectedSchema;
+  private boolean configured;
+
+  private String hCatQualifiedTableName;
+  private String hCatStaticPartitionKey;
+  private List<String> hCatDynamicPartitionKeys;
+  // DB stuff
+  private String[] dbColumnNames;
+  private String dbTableName;
+  private LCKeyMap<Integer> dbColumnTypes;
+
+  private Map<String, Integer> externalColTypes;
+
+  private int[] hCatFieldPositions; // For each DB column, HCat position
+
+  private HCatSchema hCatFullTableSchema;
+  private List<String> hCatFullTableSchemaFieldNames;
+  private LCKeyMap<String> userHiveMapping;
+
+  // For testing support
+  private static Class<? extends InputFormat> inputFormatClass =
+    SqoopHCatExportFormat.class;
+
+  private static Class<? extends OutputFormat> outputFormatClass =
+    HCatOutputFormat.class;
+
+  private static Class<? extends Mapper> exportMapperClass =
+    SqoopHCatExportMapper.class;
+
+  private static Class<? extends Mapper> importMapperClass =
+    SqoopHCatImportMapper.class;
+
+  private static Class<? extends Writable> importValueClass =
+    DefaultHCatRecord.class;
+
+  private static boolean testMode = false;
+
+  static class IntArrayWritable extends ArrayWritable {
+    public IntArrayWritable() {
+      super(IntWritable.class);
+    }
+  }
+
+  /**
+   * A Map using String as key type that ignores case of its key and stores the
+   * key in lower case.
+   */
+  private static class LCKeyMap<V> extends HashMap<String, V> {
+
+    private static final long serialVersionUID = -6751510232323094216L;
+
+    @Override
+    public V put(String key, V value) {
+      return super.put(key.toLowerCase(), value);
+    }
+
+    @Override
+    public V get(Object key) {
+      return super.get(((String) key).toLowerCase());
+    }
+  }
+
+  /**
+   * A Map using String as key type that ignores case of its key and stores the
+   * key in upper case.
+   */
+  public class UCKeyMap<V> extends HashMap<String, V> {
+
+    private static final long serialVersionUID = -6751510232323094216L;
+
+    @Override
+    public V put(String key, V value) {
+      return super.put(key.toUpperCase(), value);
+    }
+
+    @Override
+    public V get(Object key) {
+      return super.get(((String) key).toUpperCase());
+    }
+  }
+
+  /**
+   * A class to hold the instance. For guaranteeing singleton creation using JMM
+   * semantics.
+   */
+  public static final class Holder {
+    @SuppressWarnings("synthetic-access")
+    public static final SqoopHCatUtilities INSTANCE = new SqoopHCatUtilities();
+
+    private Holder() {
+    }
+  }
+
+  public static SqoopHCatUtilities instance() {
+    return Holder.INSTANCE;
+  }
+
+  private SqoopHCatUtilities() {
+    configured = false;
+  }
+
+  public static final Log LOG = LogFactory.getLog(SqoopHCatUtilities.class
+    .getName());
+
+  public boolean isConfigured() {
+    return configured;
+  }
+
+  public void configureHCat(final SqoopOptions opts, final Job job,
+    final ConnManager connMgr, final String dbTable,
+    final Configuration config) throws IOException {
+    if (configured) {
+      LOG.info("Ignoring configuration request for HCatalog info");
+      return;
+    }
+    options = opts;
+
+    LOG.info("Configuring HCatalog specific details for job");
+
+    String home = opts.getHiveHome();
+
+    if (home == null || home.length() == 0) {
+      LOG.warn("Hive home is not set. job may fail if needed jar files "
+        + "are not found correctly.  Please set HIVE_HOME in"
+        + " sqoop-env.sh or provide --hive-home option.  Setting HIVE_HOME "
+        + " to " + SqoopOptions.getHiveHomeDefault());
+    }
+
+    home = opts.getHCatHome();
+    if (home == null || home.length() == 0) {
+      LOG.warn("HCatalog home is not set. job may fail if needed jar "
+        + "files are not found correctly.  Please set HCAT_HOME in"
+        + " sqoop-env.sh or provide --hcatalog-home option.  "
+        + " Setting HCAT_HOME to " + SqoopOptions.getHCatHomeDefault());
+    }
+    connManager = connMgr;
+    dbTableName = dbTable;
+    configuration = config;
+    hCatJob = job;
+    hCatDatabaseName = options.getHCatDatabaseName() != null ? options
+      .getHCatDatabaseName() : DEFHCATDB;
+    hCatDatabaseName = hCatDatabaseName.toLowerCase();
+
+    String optHCTabName = options.getHCatTableName();
+    hCatTableName = optHCTabName.toLowerCase();
+
+    if (!hCatTableName.equals(optHCTabName)) {
+      LOG.warn("Provided HCatalog table name " + optHCTabName
+        + " will be mapped to  " + hCatTableName);
+    }
+
+    StringBuilder sb = new StringBuilder();
+    sb.append(hCatDatabaseName);
+    sb.append('.').append(hCatTableName);
+    hCatQualifiedTableName = sb.toString();
+
+    String principalID = System
+      .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
+    if (principalID != null) {
+      configuration.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
+    }
+    hCatStaticPartitionKey = options.getHivePartitionKey();
+
+    Properties userMapping = options.getMapColumnHive();
+    userHiveMapping = new LCKeyMap<String>();
+    for (Object o : userMapping.keySet()) {
+      String v = (String) userMapping.get(o);
+      userHiveMapping.put((String) o, v);
+    }
+    // Get the partition key filter if needed
+    Map<String, String> filterMap = getHCatSPFilterMap();
+    String filterStr = getHCatSPFilterStr();
+    initDBColumnNamesAndTypes();
+    if (options.doCreateHCatalogTable()) {
+      LOG.info("Creating HCatalog table " + hCatQualifiedTableName
+        + " for import");
+      createHCatTable();
+    }
+    // For serializing the schema to conf
+    HCatInputFormat hif = HCatInputFormat.setInput(hCatJob, hCatDatabaseName,
+      hCatTableName);
+    // For serializing the schema to conf
+    if (filterStr != null) {
+      LOG.info("Setting hCatInputFormat filter to " + filterStr);
+      hif.setFilter(filterStr);
+    }
+
+    hCatFullTableSchema = HCatInputFormat.getTableSchema(configuration);
+    hCatFullTableSchemaFieldNames = hCatFullTableSchema.getFieldNames();
+
+    LOG.info("HCatalog full table schema fields = "
+      + Arrays.toString(hCatFullTableSchema.getFieldNames().toArray()));
+
+    if (filterMap != null) {
+      LOG.info("Setting hCatOutputFormat filter to " + filterStr);
+    }
+
+    HCatOutputFormat.setOutput(hCatJob,
+      OutputJobInfo.create(hCatDatabaseName, hCatTableName, filterMap));
+    hCatOutputSchema = HCatOutputFormat.getTableSchema(configuration);
+    List<HCatFieldSchema> hCatPartitionSchemaFields =
+      new ArrayList<HCatFieldSchema>();
+    int totalFieldsCount = hCatFullTableSchema.size();
+    int dataFieldsCount = hCatOutputSchema.size();
+    if (totalFieldsCount > dataFieldsCount) {
+      for (int i = dataFieldsCount; i < totalFieldsCount; ++i) {
+        hCatPartitionSchemaFields.add(hCatFullTableSchema.get(i));
+      }
+    }
+
+    hCatPartitionSchema = new HCatSchema(hCatPartitionSchemaFields);
+    for (HCatFieldSchema hfs : hCatPartitionSchemaFields) {
+      if (hfs.getType() != HCatFieldSchema.Type.STRING) {
+        throw new IOException("The table provided "
+          + getQualifiedHCatTableName()
+          + " uses unsupported  partitioning key type  for column "
+          + hfs.getName() + " : " + hfs.getTypeString() + ".  Only string "
+          + "fields are allowed in partition columns in HCatalog");
+      }
+
+    }
+    LOG.info("HCatalog table partitioning key fields = "
+      + Arrays.toString(hCatPartitionSchema.getFieldNames().toArray()));
+
+    List<HCatFieldSchema> outputFieldList = new ArrayList<HCatFieldSchema>();
+    for (String col : dbColumnNames) {
+      HCatFieldSchema hfs = hCatFullTableSchema.get(col);
+      if (hfs == null) {
+        throw new IOException("Database column " + col + " not found in "
+          + " hcatalog table.");
+      }
+      if (hCatStaticPartitionKey != null
+        && hCatStaticPartitionKey.equals(col)) {
+        continue;
+      }
+      outputFieldList.add(hCatFullTableSchema.get(col));
+    }
+
+    projectedSchema = new HCatSchema(outputFieldList);
+
+    LOG.info("HCatalog projected schema fields = "
+      + Arrays.toString(projectedSchema.getFieldNames().toArray()));
+
+    validateStaticPartitionKey();
+    validateHCatTableFieldTypes();
+
+    HCatOutputFormat.setSchema(configuration, hCatFullTableSchema);
+
+    addJars(hCatJob, options);
+    config.setBoolean(DEBUG_HCAT_IMPORT_MAPPER_PROP,
+      Boolean.getBoolean(DEBUG_HCAT_IMPORT_MAPPER_PROP));
+    config.setBoolean(DEBUG_HCAT_EXPORT_MAPPER_PROP,
+      Boolean.getBoolean(DEBUG_HCAT_EXPORT_MAPPER_PROP));
+    configured = true;
+  }
+
+  public void validateDynamicPartitionKeysMapping() throws IOException {
+    // Now validate all partition columns are in the database column list
+    StringBuilder missingKeys = new StringBuilder();
+
+    for (String s : hCatDynamicPartitionKeys) {
+      boolean found = false;
+      for (String c : dbColumnNames) {
+        if (s.equals(c)) {
+          found = true;
+          break;
+        }
+      }
+      if (!found) {
+        missingKeys.append(',').append(s);
+      }
+    }
+    if (missingKeys.length() > 0) {
+      throw new IOException("Dynamic partition keys are not "
+        + "present in the database columns.   Missing keys = "
+        + missingKeys.substring(1));
+    }
+  }
+
+  public void validateHCatTableFieldTypes() throws IOException {
+    StringBuilder sb = new StringBuilder();
+    boolean hasComplexFields = false;
+    for (HCatFieldSchema hfs : projectedSchema.getFields()) {
+      if (hfs.isComplex()) {
+        sb.append('.').append(hfs.getName());
+        hasComplexFields = true;
+      }
+    }
+
+    if (hasComplexFields) {
+      String unsupportedFields = sb.substring(1);
+      throw new IOException("The HCatalog table provided "
+        + getQualifiedHCatTableName() + " has complex field types ("
+        + unsupportedFields + ").  They are currently not supported");
+    }
+
+  }
+
+  /**
+   * Get the column names to import.
+   */
+  private void initDBColumnNamesAndTypes() throws IOException {
+    String[] colNames = options.getColumns();
+    if (null == colNames) {
+      if (null != externalColTypes) {
+        // Test-injection column mapping. Extract the col names from
+        ArrayList<String> keyList = new ArrayList<String>();
+        for (String key : externalColTypes.keySet()) {
+          keyList.add(key);
+        }
+        colNames = keyList.toArray(new String[keyList.size()]);
+      } else if (null != dbTableName) {
+        colNames = connManager.getColumnNames(dbTableName);
+      } else if (options.getCall() != null) {
+        // Read procedure arguments from metadata
+        colNames = connManager.getColumnNamesForProcedure(this.options
+          .getCall());
+      } else {
+        colNames = connManager.getColumnNamesForQuery(options.getSqlQuery());
+      }
+    }
+
+    dbColumnNames = new String[colNames.length];
+
+    for (int i = 0; i < colNames.length; ++i) {
+      dbColumnNames[i] = colNames[i].toLowerCase();
+    }
+
+    LCKeyMap<Integer> colTypes = new LCKeyMap<Integer>();
+    if (externalColTypes != null) { // Use pre-defined column types.
+      colTypes.putAll(externalColTypes);
+    } else { // Get these from the database.
+      if (dbTableName != null) {
+        colTypes.putAll(connManager.getColumnTypes(dbTableName));
+      } else if (options.getCall() != null) {
+        // Read procedure arguments from metadata
+        colTypes.putAll(connManager.getColumnTypesForProcedure(this.options
+          .getCall()));
+      } else {
+        colTypes.putAll(connManager.getColumnTypesForQuery(options
+          .getSqlQuery()));
+      }
+    }
+
+    if (options.getColumns() == null) {
+      dbColumnTypes = colTypes;
+    } else {
+      dbColumnTypes = new LCKeyMap<Integer>();
+      // prune column types based on projection
+      for (String col : dbColumnNames) {
+        Integer type = colTypes.get(col);
+        if (type == null) {
+          throw new IOException("Projected column " + col
+            + " not in list of columns from database");
+        }
+        dbColumnTypes.put(col, type);
+      }
+    }
+    LOG.info("Database column names projected : "
+      + Arrays.toString(dbColumnNames));
+    LOG.info("Database column name - type map :\n\tNames: "
+      + Arrays.toString(dbColumnTypes.keySet().toArray()) + "\n\tTypes : "
+      + Arrays.toString(dbColumnTypes.values().toArray()));
+  }
+
+  private void createHCatTable() throws IOException {
+    StringBuilder sb = new StringBuilder();
+    sb.append("create table ").
+      append(hCatDatabaseName).append('.');
+    sb.append(hCatTableName).append(" (\n\t");
+    boolean first = true;
+    for (String col : dbColumnNames) {
+      String type = userHiveMapping.get(col);
+      if (type == null) {
+        type = connManager.toHCatType(dbColumnTypes.get(col));
+      }
+      if (hCatStaticPartitionKey != null
+        && col.equals(hCatStaticPartitionKey)) {
+        continue;
+      }
+      if (first) {
+        first = false;
+      } else {
+        sb.append(",\n\t");
+      }
+      sb.append(col).append(' ').append(type);
+    }
+    sb.append(")\n");
+    if (hCatStaticPartitionKey != null) {
+      sb.append("partitioned by (\n\t");
+      sb.append(hCatStaticPartitionKey).append(" string)\n");
+    }
+    String storageStanza = options.getHCatStorageStanza();
+    if (storageStanza == null) {
+      sb.append(HCAT_DEF_STORAGE_STANZA);
+    } else {
+      sb.append(storageStanza);
+    }
+    String createStatement = sb.toString();
+    LOG.info("HCatalog Create table statement: \n\n" + createStatement);
+    // Always launch as an external program so that logging is not messed
+    // up by the use of inline hive CLI except in tests
+    // We prefer external HCAT client.
+    launchHCatCli(createStatement);
+  }
+
+
+  private void validateFieldAndColumnMappings() throws IOException {
+    // Check that all explicitly mapped columns are present
+    for (Object column : userHiveMapping.keySet()) {
+      boolean found = false;
+      for (String c : dbColumnNames) {
+        if (c.equalsIgnoreCase((String) column)) {
+          found = true;
+          break;
+        }
+      }
+
+      if (!found) {
+        throw new IllegalArgumentException("Column " + column
+          + " not found while mapping database columns to hcatalog columns");
+      }
+    }
+
+    hCatFieldPositions = new int[dbColumnNames.length];
+
+    Arrays.fill(hCatFieldPositions, -1);
+
+    for (int indx = 0; indx < dbColumnNames.length; ++indx) {
+      boolean userMapped = false;
+      String col = dbColumnNames[indx];
+      Integer colType = dbColumnTypes.get(col);
+      String hCatColType = userHiveMapping.get(col);
+      if (hCatColType == null) {
+        LOG.debug("No user defined type mapping for HCatalog field " + col);
+        hCatColType = connManager.toHCatType(colType);
+      } else {
+        LOG.debug("Found type mapping for HCatalog filed " + col);
+        userMapped = true;
+      }
+      if (null == hCatColType) {
+        throw new IOException("HCat does not support the SQL type for column "
+          + col);
+      }
+
+      boolean found = false;
+      for (String tf : hCatFullTableSchemaFieldNames) {
+        if (tf.equals(col)) {
+          found = true;
+          break;
+        }
+      }
+
+      if (!found) {
+        throw new IOException("Database column " + col + " not found in "
+          + "hcatalog table schema or partition schema");
+      }
+      if (!userMapped) {
+        HCatFieldSchema hCatFS = hCatFullTableSchema.get(col);
+        if (!hCatFS.getTypeString().equals(hCatColType)) {
+          LOG.warn("The HCatalog field " + col + " has type "
+            + hCatFS.getTypeString() + ".  Expected = " + hCatColType
+            + " based on database column type : " + sqlTypeString(colType));
+          LOG.warn("The Sqoop job can fail if types are not "
+            + " assignment compatible");
+        }
+      }
+
+      if (HiveTypes.isHiveTypeImprovised(colType)) {
+        LOG.warn("Column " + col + " had to be cast to a less precise type "
+          + hCatColType + " in hcatalog");
+      }
+      hCatFieldPositions[indx] = hCatFullTableSchemaFieldNames.indexOf(col);
+      if (hCatFieldPositions[indx] < 0) {
+        throw new IOException("The HCatalog field " + col
+          + " could not be found");
+      }
+    }
+
+    IntWritable[] positions = new IntWritable[hCatFieldPositions.length];
+    for (int i : hCatFieldPositions) {
+      positions[i] = new IntWritable(hCatFieldPositions[i]);
+    }
+
+    DefaultStringifier.storeArray(configuration, positions,
+      HCAT_FIELD_POSITIONS_PROP);
+  }
+
+  private String getHCatSPFilterStr() {
+    if (hCatStaticPartitionKey != null) {
+      StringBuilder filter = new StringBuilder();
+      filter.append(options.getHivePartitionKey()).append('=').append('\'')
+        .append(options.getHivePartitionValue()).append('\'');
+      return filter.toString();
+    }
+    return null;
+  }
+
+  private Map<String, String> getHCatSPFilterMap() {
+    if (hCatStaticPartitionKey != null) {
+      Map<String, String> filter = new HashMap<String, String>();
+      filter
+        .put(options.getHivePartitionKey(), options.getHivePartitionValue());
+      return filter;
+    }
+    return null;
+  }
+
+  private void validateStaticPartitionKey() throws IOException {
+    // check the static partition key from command line
+    List<HCatFieldSchema> partFields = hCatPartitionSchema.getFields();
+
+    if (hCatStaticPartitionKey != null) {
+      boolean found = false;
+      for (HCatFieldSchema hfs : partFields) {
+        if (hfs.getName().equals(hCatStaticPartitionKey)) {
+          found = true;
+          break;
+        }
+      }
+      if (!found) {
+        throw new IOException("The provided hive partition key "
+          + hCatStaticPartitionKey + " is not part of the partition "
+          + " keys for table " + getQualifiedHCatTableName());
+      }
+    }
+    hCatDynamicPartitionKeys = new ArrayList<String>();
+    hCatDynamicPartitionKeys.addAll(hCatPartitionSchema.getFieldNames());
+    if (hCatStaticPartitionKey != null) {
+      hCatDynamicPartitionKeys.remove(hCatStaticPartitionKey);
+    }
+    configuration.set(HCAT_STATIC_PARTITION_KEY_PROP,
+      hCatStaticPartitionKey == null ? "" : hCatStaticPartitionKey);
+  }
+
+  public static void configureImportOutputFormat(SqoopOptions opts, Job job,
+    ConnManager connMgr, String dbTable, Configuration config)
+    throws IOException {
+
+    LOG.info("Configuring HCatalog for import job");
+    SqoopHCatUtilities.instance().configureHCat(opts, job, connMgr, dbTable,
+      job.getConfiguration());
+    LOG.info("Validating dynamic partition keys");
+    SqoopHCatUtilities.instance().validateFieldAndColumnMappings();
+    SqoopHCatUtilities.instance().validateDynamicPartitionKeysMapping();
+    job.setOutputFormatClass(getOutputFormatClass());
+    IntWritable[] delimChars = new IntWritable[5];
+    String hiveReplacement = "";
+    LOG.debug("Hive delimiters will be fixed during import");
+    DelimiterSet delims = opts.getOutputDelimiters();
+    if (!opts.explicitOutputDelims()) {
+      delims = DelimiterSet.HIVE_DELIMITERS;
+    }
+    delimChars = new IntWritable[] {
+      new IntWritable(delims.getFieldsTerminatedBy()),
+      new IntWritable(delims.getLinesTerminatedBy()),
+      new IntWritable(delims.getEnclosedBy()),
+      new IntWritable(delims.getEscapedBy()),
+      new IntWritable(delims.isEncloseRequired() ? 1 : 0), };
+    hiveReplacement = opts.getHiveDelimsReplacement();
+    if (hiveReplacement == null) {
+      hiveReplacement = "";
+    }
+
+    LOG.debug("Setting hive delimiters information");
+    DefaultStringifier.storeArray(config, delimChars,
+      HIVE_DELIMITERS_TO_REPLACE_PROP);
+    config.set(HIVE_DELIMITERS_REPLACEMENT_PROP, hiveReplacement);
+    if (opts.doHiveDropDelims() || opts.getHiveDelimsReplacement() != null) {
+      LOG.debug("Enabling hive delimter replacement");
+      config.set(HIVE_DELIMITERS_REPLACEMENT_ENABLED_PROP, "true");
+    } else {
+      LOG.debug("Disabling hive delimter replacement");
+      config.set(HIVE_DELIMITERS_REPLACEMENT_ENABLED_PROP, "false");
+    }
+  }
+
+  public static void configureExportInputFormat(SqoopOptions opts, Job job,
+    ConnManager connMgr, String dbTable, Configuration config)
+    throws IOException {
+
+    LOG.info("Configuring HCatalog for export job");
+    SqoopHCatUtilities hCatUtils = SqoopHCatUtilities.instance();
+    hCatUtils
+      .configureHCat(opts, job, connMgr, dbTable, job.getConfiguration());
+    job.setInputFormatClass(getInputFormatClass());
+    Map<String, Integer> dbColTypes = hCatUtils.getDbColumnTypes();
+    MapWritable columnTypesJava = new MapWritable();
+    for (Map.Entry<String, Integer> e : dbColTypes.entrySet()) {
+      Text columnName = new Text(e.getKey());
+      Text columnText = new Text(connMgr.toJavaType(dbTable, e.getKey(),
+        e.getValue()));
+      columnTypesJava.put(columnName, columnText);
+    }
+    MapWritable columnTypesSql = new MapWritable();
+    for (Map.Entry<String, Integer> e : dbColTypes.entrySet()) {
+      Text columnName = new Text(e.getKey());
+      IntWritable sqlType = new IntWritable(e.getValue());
+      columnTypesSql.put(columnName, sqlType);
+    }
+    DefaultStringifier.store(config, columnTypesJava,
+      SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_JAVA);
+    DefaultStringifier.store(config, columnTypesSql,
+      SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_SQL);
+  }
+
+  /**
+   * Add the Hive and HCatalog jar files to local classpath and dist cache.
+   * @throws IOException
+   */
+  public static void addJars(Job job, SqoopOptions options) throws IOException {
+
+    if (isLocalJobTracker(job)) {
+      LOG.info("Not adding hcatalog jars to distributed cache in local mode");
+      return;
+    }
+    Configuration conf = job.getConfiguration();
+    String hiveHome = null;
+    String hCatHome = null;
+    FileSystem fs = FileSystem.getLocal(conf);
+    if (options != null) {
+      hiveHome = options.getHiveHome();
+    }
+    if (hiveHome == null) {
+      hiveHome = SqoopOptions.getHiveHomeDefault();
+    }
+    if (options != null) {
+      hCatHome = options.getHCatHome();
+    }
+    if (hCatHome == null) {
+      hCatHome = SqoopOptions.getHCatHomeDefault();
+    }
+    LOG.info("HCatalog job : Hive Home = " + hiveHome);
+    LOG.info("HCatalog job:  HCatalog Home = " + hCatHome);
+
+    conf.addResource(hiveHome + HIVESITEXMLPATH);
+
+    // Add these to the 'tmpjars' array, which the MR JobSubmitter
+    // will upload to HDFS and put in the DistributedCache libjars.
+    List<String> libDirs = new ArrayList<String>();
+    libDirs.add(hCatHome + File.separator + HCATSHAREDIR);
+    libDirs.add(hCatHome + File.separator + DEFLIBDIR);
+    libDirs.add(hiveHome + File.separator + DEFLIBDIR);
+    Set<String> localUrls = new HashSet<String>();
+    // Add any libjars already specified
+    localUrls
+      .addAll(conf
+        .getStringCollection(
+        ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM));
+    for (String dir : libDirs) {
+      LOG.info("Adding jar files under " + dir + " to distributed cache");
+      addDirToCache(new File(dir), fs, localUrls, false);
+    }
+
+    // Recursively add all hcatalog storage handler jars
+    // The HBase storage handler is getting deprecated post Hive+HCat merge
+    String hCatStorageHandlerDir = hCatHome + File.separator
+      + "share/hcatalog/storage-handlers";
+    LOG.info("Adding jar files under " + hCatStorageHandlerDir
+      + " to distributed cache (recursively)");
+
+    addDirToCache(new File(hCatStorageHandlerDir), fs, localUrls, true);
+
+    String tmpjars = conf
+      .get(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM);
+    StringBuilder sb = new StringBuilder(1024);
+    if (null != tmpjars) {
+      sb.append(tmpjars);
+      sb.append(",");
+    }
+    sb.append(StringUtils.arrayToString(localUrls.toArray(new String[0])));
+    conf.set(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM, sb.toString());
+  }
+
+  /**
+   * Add the .jar elements of a directory to the DCache classpath, optionally
+   * recursively.
+   */
+  private static void addDirToCache(File dir, FileSystem fs,
+    Set<String> localUrls, boolean recursive) {
+    if (dir == null) {
+      return;
+    }
+
+    File[] fileList = dir.listFiles();
+
+    if (fileList == null) {
+      LOG.warn("No files under " + dir
+        + " to add to distributed cache for hcatalog job");
+      return;
+    }
+
+    for (File libFile : dir.listFiles()) {
+      if (libFile.exists() && !libFile.isDirectory()
+        && libFile.getName().endsWith("jar")) {
+        Path p = new Path(libFile.toString());
+        if (libFile.canRead()) {
+          String qualified = p.makeQualified(fs).toString();
+          LOG.info("Adding to job classpath: " + qualified);
+          localUrls.add(qualified);
+        } else {
+          LOG.warn("Ignoring unreadable file " + libFile);
+        }
+      }
+      if (recursive && libFile.isDirectory()) {
+        addDirToCache(libFile, fs, localUrls, recursive);
+      }
+    }
+  }
+
+  public static boolean isHadoop1() {
+    String version = org.apache.hadoop.util.VersionInfo.getVersion();
+    if (version.matches("\\b0\\.20\\..+\\b")
+      || version.matches("\\b1\\.\\d\\.\\d")) {
+      return true;
+    }
+    return false;
+  }
+
+  public static boolean isLocalJobTracker(Job job) {
+    Configuration conf = job.getConfiguration();
+    // If framework is set to YARN, then we can't be running in local mode
+    if ("yarn".equalsIgnoreCase(conf
+      .get(ConfigurationConstants.PROP_MAPREDUCE_FRAMEWORK_NAME))) {
+      return false;
+    }
+    String jtAddr = conf
+      .get(ConfigurationConstants.PROP_MAPRED_JOB_TRACKER_ADDRESS);
+    String jtAddr2 = conf
+      .get(ConfigurationConstants.PROP_MAPREDUCE_JOB_TRACKER_ADDRESS);
+    return (jtAddr != null && jtAddr.equals("local"))
+      || (jtAddr2 != null && jtAddr2.equals("local"));
+  }
+
+  public void invokeOutputCommitterForLocalMode(Job job) throws IOException {
+    if (isLocalJobTracker(job) && isHadoop1()) {
+      LOG.info("Explicitly committing job in local mode");
+      HCatHadoopShims.Instance.get().commitJob(new HCatOutputFormat(), job);
+    }
+  }
+
+  public void launchHCatCli(String cmdLine)
+    throws IOException {
+    String tmpFileName = null;
+
+
+    String tmpDir = System.getProperty("java.io.tmpdir");
+    if (options != null) {
+      tmpDir = options.getTempDir();
+    }
+    tmpFileName =
+      new File(tmpDir, "hcat-script-"
+        + System.currentTimeMillis()).getAbsolutePath();
+
+    writeHCatScriptFile(tmpFileName, cmdLine);
+    // Create the argv for the HCatalog Cli Driver.
+    String[] argArray = new String[2];
+    argArray[0] = "-f";
+    argArray[1] = tmpFileName;
+    String argLine = StringUtils.join(",", Arrays.asList(argArray));
+
+    if (testMode) {
+      LOG.debug("Executing HCatalog CLI in-process with " + argLine);
+      executeHCatProgramInProcess(argArray);
+    } else {
+      LOG.info("Executing external HCatalog CLI process with args :" + argLine);
+      executeExternalHCatProgram(Executor.getCurEnvpStrings(), argArray);
+    }
+  }
+
+  public void writeHCatScriptFile(String fileName, String contents)
+    throws IOException {
+    BufferedWriter w = null;
+    try {
+      FileOutputStream fos = new FileOutputStream(fileName);
+      w = new BufferedWriter(new OutputStreamWriter(fos));
+      w.write(contents, 0, contents.length());
+    } catch (IOException ioe) {
+      LOG.error("Error writing HCatalog load-in script", ioe);
+      throw ioe;
+    } finally {
+      if (null != w) {
+        try {
+          w.close();
+        } catch (IOException ioe) {
+          LOG.warn("IOException closing stream to HCatalog script", ioe);
+        }
+      }
+    }
+  }
+
+  /**
+   * Execute HCat via an external 'bin/hcat' process.
+   * @param env
+   *          the environment strings to pass to any subprocess.
+   * @throws IOException
+   *           if HCatalog did not exit successfully.
+   */
+  public void executeExternalHCatProgram(List<String> env, String[] cmdLine)
+    throws IOException {
+    // run HCat command with the given args
+    String hCatProgram = getHCatPath();
+    ArrayList<String> args = new ArrayList<String>();
+    args.add(hCatProgram);
+    if (cmdLine != null && cmdLine.length > 0) {
+      for (String s : cmdLine) {
+        args.add(s);
+      }
+    }
+    LoggingAsyncSink logSink = new LoggingAsyncSink(LOG);
+    int ret = Executor.exec(args.toArray(new String[0]),
+      env.toArray(new String[0]), logSink, logSink);
+    if (0 != ret) {
+      throw new IOException("HCat exited with status " + ret);
+    }
+  }
+
+  public void executeHCatProgramInProcess(String[] argv) throws IOException {
+    SubprocessSecurityManager subprocessSM = null;
+
+    try {
+      Class<?> cliDriverClass = Class.forName(HCAT_CLI_MAIN_CLASS);
+      subprocessSM = new SubprocessSecurityManager();
+      subprocessSM.install();
+      Method mainMethod = cliDriverClass.getMethod("main", argv.getClass());
+      mainMethod.invoke(null, (Object) argv);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException("HCatalog class not found", cnfe);
+    } catch (NoSuchMethodException nsme) {
+      throw new IOException("Could not access HCatCli.main()", nsme);
+    } catch (IllegalAccessException iae) {
+      throw new IOException("Could not access HatCli.main()", iae);
+    } catch (InvocationTargetException ite) {
+      // This may have been the ExitSecurityException triggered by the
+      // SubprocessSecurityManager.
+      Throwable cause = ite.getCause();
+      if (cause instanceof ExitSecurityException) {
+        ExitSecurityException ese = (ExitSecurityException) cause;
+        int status = ese.getExitStatus();
+        if (status != 0) {
+          throw new IOException("HCatCli  exited with status=" + status);
+        }
+      } else {
+        throw new IOException("Exception thrown from HCatCli", ite);
+      }
+    } finally {
+      if (null != subprocessSM) {
+        subprocessSM.uninstall();
+      }
+    }
+  }
+
+  /**
+   * @return the filename of the hcat executable to run to do the import
+   */
+  public String getHCatPath() {
+    String hCatHome = null;
+    if (options == null) {
+      hCatHome = SqoopOptions.getHCatHomeDefault();
+    } else {
+      hCatHome = options.getHCatHome();
+    }
+
+    if (null == hCatHome) {
+      return null;
+    }
+
+    Path p = new Path(hCatHome);
+    p = new Path(p, "bin");
+    p = new Path(p, HCATCMD);
+    String hCatBinStr = p.toString();
+    if (new File(hCatBinStr).canExecute()) {
+      return hCatBinStr;
+    } else {
+      return null;
+    }
+  }
+
+  public static boolean isTestMode() {
+    return testMode;
+  }
+
+  public static void setTestMode(boolean mode) {
+    testMode = mode;
+  }
+
+  public static Class<? extends InputFormat> getInputFormatClass() {
+    return inputFormatClass;
+  }
+
+  public static Class<? extends OutputFormat> getOutputFormatClass() {
+    return outputFormatClass;
+  }
+
+  public static void setInputFormatClass(Class<? extends InputFormat> clz) {
+    inputFormatClass = clz;
+  }
+
+  public static void setOutputFormatClass(Class<? extends OutputFormat> clz) {
+    outputFormatClass = clz;
+  }
+
+  public static Class<? extends Mapper> getImportMapperClass() {
+    return importMapperClass;
+  }
+
+  public static Class<? extends Mapper> getExportMapperClass() {
+    return exportMapperClass;
+  }
+
+  public static void setExportMapperClass(Class<? extends Mapper> clz) {
+    exportMapperClass = clz;
+  }
+
+  public static void setImportMapperClass(Class<? extends Mapper> clz) {
+    importMapperClass = clz;
+  }
+
+  public static Class<? extends Writable> getImportValueClass() {
+    return importValueClass;
+  }
+
+  public static void setImportValueClass(Class<? extends Writable> clz) {
+    importValueClass = clz;
+  }
+
+  /**
+   * Set the column type map to be used. (dependency injection for testing; not
+   * used in production.)
+   */
+  public void setColumnTypes(Map<String, Integer> colTypes) {
+    externalColTypes = colTypes;
+    LOG.debug("Using test-controlled type map");
+  }
+
+  public String getDatabaseTable() {
+    return dbTableName;
+  }
+
+  public String getHCatTableName() {
+    return hCatTableName;
+  }
+
+  public String getHCatDatabaseName() {
+    return hCatDatabaseName;
+  }
+
+  public String getQualifiedHCatTableName() {
+    return hCatQualifiedTableName;
+  }
+
+  public List<String> getHCatDynamicPartitionKeys() {
+    return hCatDynamicPartitionKeys;
+  }
+
+  public String getHCatStaticPartitionKey() {
+    return hCatStaticPartitionKey;
+  }
+
+  public String[] getDBColumnNames() {
+    return dbColumnNames;
+  }
+
+  public HCatSchema getHCatOutputSchema() {
+    return hCatOutputSchema;
+  }
+
+  public void setHCatOutputSchema(HCatSchema schema) {
+    hCatOutputSchema = schema;
+  }
+
+  public HCatSchema getHCatPartitionSchema() {
+    return hCatPartitionSchema;
+  }
+
+  public void setHCatPartitionSchema(HCatSchema schema) {
+    hCatPartitionSchema = schema;
+  }
+
+  public void setHCatStaticPartitionKey(String key) {
+    hCatStaticPartitionKey = key;
+  }
+
+  public void setHCatDynamicPartitionKeys(List<String> keys) {
+    hCatDynamicPartitionKeys = keys;
+  }
+
+  public String[] getDbColumnNames() {
+    return dbColumnNames;
+  }
+
+  public void setDbColumnNames(String[] names) {
+    dbColumnNames = names;
+  }
+
+  public Map<String, Integer> getDbColumnTypes() {
+    return dbColumnTypes;
+  }
+
+  public void setDbColumnTypes(Map<String, Integer> types) {
+    dbColumnTypes.putAll(types);
+  }
+
+  public String gethCatTableName() {
+    return hCatTableName;
+  }
+
+  public String gethCatDatabaseName() {
+    return hCatDatabaseName;
+  }
+
+  public String gethCatQualifiedTableName() {
+    return hCatQualifiedTableName;
+  }
+
+  public void setConfigured(boolean value) {
+    configured = value;
+  }
+
+  public static String sqlTypeString(int sqlType) {
+    switch (sqlType) {
+      case Types.BIT:
+        return "BIT";
+      case Types.TINYINT:
+        return "TINYINT";
+      case Types.SMALLINT:
+        return "SMALLINT";
+      case Types.INTEGER:
+        return "INTEGER";
+      case Types.BIGINT:
+        return "BIGINT";
+      case Types.FLOAT:
+        return "FLOAT";
+      case Types.REAL:
+        return "REAL";
+      case Types.DOUBLE:
+        return "DOUBLE";
+      case Types.NUMERIC:
+        return "NUMERIC";
+      case Types.DECIMAL:
+        return "DECIMAL";
+      case Types.CHAR:
+        return "CHAR";
+      case Types.VARCHAR:
+        return "VARCHAR";
+      case Types.LONGVARCHAR:
+        return "LONGVARCHAR";
+      case Types.DATE:
+        return "DATE";
+      case Types.TIME:
+        return "TIME";
+      case Types.TIMESTAMP:
+        return "TIMESTAMP";
+      case Types.BINARY:
+        return "BINARY";
+      case Types.VARBINARY:
+        return "VARBINARY";
+      case Types.LONGVARBINARY:
+        return "LONGVARBINARY";
+      case Types.NULL:
+        return "NULL";
+      case Types.OTHER:
+        return "OTHER";
+      case Types.JAVA_OBJECT:
+        return "JAVA_OBJECT";
+      case Types.DISTINCT:
+        return "DISTINCT";
+      case Types.STRUCT:
+        return "STRUCT";
+      case Types.ARRAY:
+        return "ARRAY";
+      case Types.BLOB:
+        return "BLOB";
+      case Types.CLOB:
+        return "CLOB";
+      case Types.REF:
+        return "REF";
+      case Types.DATALINK:
+        return "DATALINK";
+      case Types.BOOLEAN:
+        return "BOOLEAN";
+      case Types.ROWID:
+        return "ROWID";
+      case Types.NCHAR:
+        return "NCHAR";
+      case Types.NVARCHAR:
+        return "NVARCHAR";
+      case Types.LONGNVARCHAR:
+        return "LONGNVARCHAR";
+      case Types.NCLOB:
+        return "NCLOB";
+      case Types.SQLXML:
+        return "SQLXML";
+      default:
+        return "<UNKNOWN>";
+    }
+  }
+}
index 42f521f..01a55e5 100644 (file)
@@ -108,6 +108,13 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
   public static final String HIVE_PARTITION_VALUE_ARG = "hive-partition-value";
   public static final String CREATE_HIVE_TABLE_ARG =
       "create-hive-table";
+  public static final String HCATALOG_TABLE_ARG = "hcatalog-table";
+  public static final String HCATALOG_DATABASE_ARG = "hcatalog-database";
+  public static final String CREATE_HCATALOG_TABLE_ARG =
+    "create-hcatalog-table";
+  public static final String HCATALOG_STORAGE_STANZA_ARG =
+    "hcatalog-storage-stanza";
+  public static final String HCATALOG_HOME_ARG = "hcatalog-home";
   public static final String MAPREDUCE_JOB_NAME = "mapreduce-job-name";
   public static final String NUM_MAPPERS_ARG = "num-mappers";
   public static final String NUM_MAPPERS_SHORT_ARG = "m";
@@ -488,6 +495,66 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
     return hiveOpts;
   }
 
+   /**
+   * @return options governing interaction with HCatalog.
+   */
+  protected RelatedOptions getHCatalogOptions() {
+    RelatedOptions hCatOptions = new RelatedOptions("HCatalog arguments");
+    hCatOptions.addOption(OptionBuilder
+      .hasArg()
+      .withDescription("HCatalog table name")
+      .withLongOpt(HCATALOG_TABLE_ARG)
+      .create());
+    hCatOptions.addOption(OptionBuilder
+      .hasArg()
+      .withDescription("HCatalog database name")
+      .withLongOpt(HCATALOG_DATABASE_ARG)
+      .create());
+
+    hCatOptions.addOption(OptionBuilder.withArgName("dir")
+      .hasArg().withDescription("Override $HIVE_HOME")
+      .withLongOpt(HIVE_HOME_ARG)
+      .create());
+    hCatOptions.addOption(OptionBuilder.withArgName("hdir")
+      .hasArg().withDescription("Override $HCAT_HOME")
+      .withLongOpt(HCATALOG_HOME_ARG)
+      .create());
+    hCatOptions.addOption(OptionBuilder.withArgName("partition-key")
+      .hasArg()
+      .withDescription("Sets the partition key to use when importing to hive")
+      .withLongOpt(HIVE_PARTITION_KEY_ARG)
+      .create());
+    hCatOptions.addOption(OptionBuilder.withArgName("partition-value")
+      .hasArg()
+      .withDescription("Sets the partition value to use when importing "
+        + "to hive")
+      .withLongOpt(HIVE_PARTITION_VALUE_ARG)
+      .create());
+    hCatOptions.addOption(OptionBuilder
+      .hasArg()
+      .withDescription("Override mapping for specific column to hive"
+        + " types.")
+      .withLongOpt(MAP_COLUMN_HIVE)
+      .create());
+
+    return hCatOptions;
+  }
+
+  protected RelatedOptions getHCatImportOnlyOptions() {
+    RelatedOptions hCatOptions = new RelatedOptions(
+      "HCatalog import specific options");
+    hCatOptions.addOption(OptionBuilder
+      .withDescription("Create HCatalog before import")
+      .withLongOpt(CREATE_HCATALOG_TABLE_ARG)
+      .create());
+    hCatOptions.addOption(OptionBuilder
+      .hasArg()
+      .withDescription("HCatalog storage stanza for table creation")
+      .withLongOpt(HCATALOG_STORAGE_STANZA_ARG)
+      .create());
+    return hCatOptions;
+  }
+
   /**
    * @return options governing output format delimiters
    */
@@ -826,7 +893,7 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
       out.setHiveTableName(in.getOptionValue(HIVE_TABLE_ARG));
     }
 
-    if(in.hasOption(HIVE_DATABASE_ARG)) {
+    if (in.hasOption(HIVE_DATABASE_ARG)) {
       out.setHiveDatabaseName(in.getOptionValue(HIVE_DATABASE_ARG));
     }
 
@@ -852,38 +919,79 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
     }
   }
 
+  protected void applyHCatOptions(CommandLine in, SqoopOptions out) {
+    if (in.hasOption(HCATALOG_TABLE_ARG)) {
+      out.setHCatTableName(in.getOptionValue(HCATALOG_TABLE_ARG));
+    }
+
+    if (in.hasOption(HCATALOG_DATABASE_ARG)) {
+      out.setHCatDatabaseName(in.getOptionValue(HCATALOG_DATABASE_ARG));
+    }
+
+    if (in.hasOption(HCATALOG_STORAGE_STANZA_ARG)) {
+      out.setHCatStorageStanza(in.getOptionValue(HCATALOG_STORAGE_STANZA_ARG));
+    }
+
+    if (in.hasOption(CREATE_HCATALOG_TABLE_ARG)) {
+      out.setCreateHCatalogTable(true);
+    }
+
+    if (in.hasOption(HCATALOG_HOME_ARG)) {
+      out.setHCatHome(in.getOptionValue(HCATALOG_HOME_ARG));
+    }
+
+    // Allow some of the hive options also
+
+    if (in.hasOption(HIVE_HOME_ARG)) {
+      out.setHiveHome(in.getOptionValue(HIVE_HOME_ARG));
+    }
+
+    if (in.hasOption(HIVE_PARTITION_KEY_ARG)) {
+      out.setHivePartitionKey(in.getOptionValue(HIVE_PARTITION_KEY_ARG));
+    }
+
+    if (in.hasOption(HIVE_PARTITION_VALUE_ARG)) {
+      out.setHivePartitionValue(in.getOptionValue(HIVE_PARTITION_VALUE_ARG));
+    }
+
+    if (in.hasOption(MAP_COLUMN_HIVE)) {
+      out.setMapColumnHive(in.getOptionValue(MAP_COLUMN_HIVE));
+    }
+  }
+
+
   protected void applyOutputFormatOptions(CommandLine in, SqoopOptions out)
       throws InvalidOptionsException {
     if (in.hasOption(FIELDS_TERMINATED_BY_ARG)) {
       out.setFieldsTerminatedBy(SqoopOptions.toChar(
           in.getOptionValue(FIELDS_TERMINATED_BY_ARG)));
-      out.setExplicitDelims(true);
+      out.setExplicitOutputDelims(true);
     }
 
     if (in.hasOption(LINES_TERMINATED_BY_ARG)) {
       out.setLinesTerminatedBy(SqoopOptions.toChar(
           in.getOptionValue(LINES_TERMINATED_BY_ARG)));
-      out.setExplicitDelims(true);
+      out.setExplicitOutputDelims(true);
     }
 
     if (in.hasOption(OPTIONALLY_ENCLOSED_BY_ARG)) {
       out.setEnclosedBy(SqoopOptions.toChar(
           in.getOptionValue(OPTIONALLY_ENCLOSED_BY_ARG)));
       out.setOutputEncloseRequired(false);
-      out.setExplicitDelims(true);
+      out.setExplicitOutputDelims(true);
     }
 
     if (in.hasOption(ENCLOSED_BY_ARG)) {
       out.setEnclosedBy(SqoopOptions.toChar(
           in.getOptionValue(ENCLOSED_BY_ARG)));
       out.setOutputEncloseRequired(true);
-      out.setExplicitDelims(true);
+      out.setExplicitOutputDelims(true);
     }
 
     if (in.hasOption(ESCAPED_BY_ARG)) {
       out.setEscapedBy(SqoopOptions.toChar(
           in.getOptionValue(ESCAPED_BY_ARG)));
-      out.setExplicitDelims(true);
+      out.setExplicitOutputDelims(true);
     }
 
     if (in.hasOption(MYSQL_DELIMITERS_ARG)) {
@@ -892,7 +1000,7 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
       out.setLinesTerminatedBy('\n');
       out.setEscapedBy('\\');
       out.setEnclosedBy('\'');
-      out.setExplicitDelims(true);
+      out.setExplicitOutputDelims(true);
     }
   }
 
@@ -901,28 +1009,33 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
     if (in.hasOption(INPUT_FIELDS_TERMINATED_BY_ARG)) {
       out.setInputFieldsTerminatedBy(SqoopOptions.toChar(
           in.getOptionValue(INPUT_FIELDS_TERMINATED_BY_ARG)));
+      out.setExplicitInputDelims(true);
     }
 
     if (in.hasOption(INPUT_LINES_TERMINATED_BY_ARG)) {
       out.setInputLinesTerminatedBy(SqoopOptions.toChar(
           in.getOptionValue(INPUT_LINES_TERMINATED_BY_ARG)));
+      out.setExplicitInputDelims(true);
     }
 
     if (in.hasOption(INPUT_OPTIONALLY_ENCLOSED_BY_ARG)) {
       out.setInputEnclosedBy(SqoopOptions.toChar(
           in.getOptionValue(INPUT_OPTIONALLY_ENCLOSED_BY_ARG)));
       out.setInputEncloseRequired(false);
+      out.setExplicitInputDelims(true);
     }
 
     if (in.hasOption(INPUT_ENCLOSED_BY_ARG)) {
       out.setInputEnclosedBy(SqoopOptions.toChar(
           in.getOptionValue(INPUT_ENCLOSED_BY_ARG)));
       out.setInputEncloseRequired(true);
+      out.setExplicitInputDelims(true);
     }
 
     if (in.hasOption(INPUT_ESCAPED_BY_ARG)) {
       out.setInputEscapedBy(SqoopOptions.toChar(
           in.getOptionValue(INPUT_ESCAPED_BY_ARG)));
+      out.setExplicitInputDelims(true);
     }
   }
 
@@ -1021,7 +1134,7 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
   protected void validateOutputFormatOptions(SqoopOptions options)
       throws InvalidOptionsException {
     if (options.doHiveImport()) {
-      if (!options.explicitDelims()) {
+      if (!options.explicitOutputDelims()) {
         // user hasn't manually specified delimiters, and wants to import
         // straight to Hive. Use Hive-style delimiters.
         LOG.info("Using Hive-specific delimiters for output. You can override");
@@ -1050,6 +1163,14 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
               + " option." + HELP_STR);
     }
 
+    // Make sure that one of hCatalog or hive jobs are used
+    String hCatTable = options.getHCatTableName();
+    if (hCatTable != null && options.doHiveImport()) {
+      throw new InvalidOptionsException("The " + HCATALOG_TABLE_ARG
+        + " option conflicts with the " + HIVE_IMPORT_ARG
+        + " option." + HELP_STR);
+    }
+
     if(options.doHiveImport()
         && options.getFileLayout() == SqoopOptions.FileLayout.AvroDataFile) {
       throw new InvalidOptionsException("Hive import is not compatible with "
@@ -1083,16 +1204,19 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
     }
 
     // Warn about using hive specific arguments without hive import itself
+    // In HCatalog support some of the Hive options are reused
     if (!options.doHiveImport()
       && ((options.getHiveHome() != null
-            && !options.getHiveHome().equals(SqoopOptions.getHiveHomeDefault()))
+        && !options.getHiveHome().
+          equals(SqoopOptions.getHiveHomeDefault())
+          && hCatTable == null))
         || options.doOverwriteHiveTable()
         || options.doFailIfHiveTableExists()
         || (options.getHiveTableName() != null
             && !options.getHiveTableName().equals(options.getTableName()))
-        || options.getHivePartitionKey() != null
-        || options.getHivePartitionValue() != null
-        || options.getMapColumnHive().size() > 0)) {
+        || (options.getHivePartitionKey() != null && hCatTable == null)
+        || (options.getHivePartitionValue() != null && hCatTable == null)
+        || (options.getMapColumnHive().size() > 0 && hCatTable == null)) {
       LOG.warn("It seems that you've specified at least one of following:");
       LOG.warn("\t--hive-home");
       LOG.warn("\t--hive-overwrite");
@@ -1105,6 +1229,89 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
       LOG.warn("those arguments will not be used in this session. Either");
       LOG.warn("specify --hive-import to apply them correctly or remove them");
       LOG.warn("from command line to remove this warning.");
+      LOG.info("Please note that --hive-home, --hive-partition-key, ");
+      LOG.info("\t hive-partition-value and --map-column-hive options are ");
+      LOG.info("\t are also valid for HCatalog imports and exports");
+    }
+  }
+
+  protected void validateHCatalogOptions(SqoopOptions options)
+    throws InvalidOptionsException {
+    // Make sure that one of hCatalog or hive jobs are used
+    String hCatTable = options.getHCatTableName();
+    if (hCatTable == null) {
+      if (options.getHCatHome() != null && !options.getHCatHome().
+        equals(SqoopOptions.getHCatHomeDefault())) {
+        LOG.warn("--hcatalog-home option will be ignored in "
+          + "non-HCatalog jobs");
+      }
+      if (options.getHCatDatabaseName() != null) {
+        LOG.warn("--hcatalog-database option will be ignored  "
+          + "without --hcatalog-table");
+      }
+
+      if (options.getHCatStorageStanza() != null) {
+        LOG.warn("--hcatalog-storage-stanza option will be ignored "
+          + "without --hatalog-table");
+      }
+      return;
+    }
+
+    if (options.explicitInputDelims()) {
+      LOG.warn("Input field/record delimiter options are not "
+        + "used in HCatalog jobs unless the format is text.   It is better "
+        + "to use --hive-import in those cases.  For text formats");
+    }
+    if (options.explicitOutputDelims()
+      || options.getHiveDelimsReplacement() != null
+      || options.doHiveDropDelims()) {
+      LOG.warn("Output field/record delimiter options are not useful"
+        + " in HCatalog jobs for most of the output types except text based "
+        + " formats is text. It is better "
+        + "to use --hive-import in those cases.  For non text formats, ");
+    }
+    if (options.doHiveImport()) {
+      throw new InvalidOptionsException("The " + HCATALOG_TABLE_ARG
+        + " option conflicts with the " + HIVE_IMPORT_ARG
+        + " option." + HELP_STR);
+    }
+    if (options.getTargetDir() != null) {
+      throw new InvalidOptionsException("The " + TARGET_DIR_ARG
+        + " option conflicts with the " + HCATALOG_TABLE_ARG
+        + " option." + HELP_STR);
+    }
+    if (options.getWarehouseDir() != null) {
+      throw new InvalidOptionsException("The " + WAREHOUSE_DIR_ARG
+        + " option conflicts with the " + HCATALOG_TABLE_ARG
+        + " option." + HELP_STR);
+    }
+    if (options.isDirect()) {
+      throw new InvalidOptionsException("Direct import is incompatible with "
+        + "HCatalog. Please remove the parameter --direct");
+    }
+    if (options.isAppendMode()) {
+      throw new InvalidOptionsException("Append mode for imports is not "
+        + " compatible with HCatalog. Please remove the parameter"
+        + "--append-mode");
+    }
+    if (options.getExportDir() != null) {
+      throw new InvalidOptionsException("The " + EXPORT_PATH_ARG
+        + " option conflicts with the " + HCATALOG_TABLE_ARG
+        + " option." + HELP_STR);
+    }
+
+    if (options.getFileLayout() == SqoopOptions.FileLayout.AvroDataFile) {
+      throw new InvalidOptionsException("HCatalog job is not compatible with "
+        + " AVRO format option " + FMT_AVRODATAFILE_ARG
+        + " option." + HELP_STR);
+
+    }
+
+    if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
+      throw new InvalidOptionsException("HCatalog job  is not compatible with "
+        + "SequenceFile format option " + FMT_SEQUENCEFILE_ARG
+        + " option." + HELP_STR);
+
     }
   }
 
index dd34a97..c1ea881 100644 (file)
@@ -160,6 +160,7 @@ public class CodeGenTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
     toolOptions.addUniqueOptions(getOutputFormatOptions());
     toolOptions.addUniqueOptions(getInputFormatOptions());
     toolOptions.addUniqueOptions(getHiveOptions(true));
+    toolOptions.addUniqueOptions(getHCatalogOptions());
   }
 
   @Override
@@ -188,6 +189,7 @@ public class CodeGenTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
     applyInputFormatOptions(in, out);
     applyCodeGenOptions(in, out, false);
     applyHiveOptions(in, out);
+    applyHCatOptions(in, out);
   }
 
   @Override
@@ -203,6 +205,7 @@ public class CodeGenTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
     validateCodeGenOptions(options);
     validateOutputFormatOptions(options);
     validateHiveOptions(options);
+    validateHCatalogOptions(options);
 
     if (options.getTableName() == null
      && options.getSqlQuery() == null) {
index 215addd..4c7d00c 100644 (file)
@@ -215,6 +215,7 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
         .create());
 
     toolOptions.addUniqueOptions(codeGenOpts);
+    toolOptions.addUniqueOptions(getHCatalogOptions());
   }
 
   @Override
@@ -291,6 +292,7 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
       applyInputFormatOptions(in, out);
       applyOutputFormatOptions(in, out);
       applyCodeGenOptions(in, out, false);
+      applyHCatOptions(in, out);
     } catch (NumberFormatException nfe) {
       throw new InvalidOptionsException("Error: expected numeric argument.\n"
           + "Try --help for usage.");
@@ -307,9 +309,11 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
       throw new InvalidOptionsException(
           "Export requires a --table or a --call argument."
           + HELP_STR);
-    } else if (options.getExportDir() == null) {
+    } else if (options.getExportDir() == null
+      && options.getHCatTableName() == null) {
       throw new InvalidOptionsException(
-          "Export requires an --export-dir argument."
+          "Export requires an --export-dir argument or "
+          + "--hcatalog-table argument."
           + HELP_STR);
     } else if (options.getExistingJarName() != null
         && options.getClassName() == null) {
@@ -382,6 +386,7 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
     validateOutputFormatOptions(options);
     validateCommonOptions(options);
     validateCodeGenOptions(options);
+    validateHCatalogOptions(options);
   }
 
   private void applyNewUpdateOptions(CommandLine in, SqoopOptions out)
index 2627726..424d9ec 100644 (file)
@@ -653,6 +653,8 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
     toolOptions.addUniqueOptions(getInputFormatOptions());
     toolOptions.addUniqueOptions(getHiveOptions(true));
     toolOptions.addUniqueOptions(getHBaseOptions());
+    toolOptions.addUniqueOptions(getHCatalogOptions());
+    toolOptions.addUniqueOptions(getHCatImportOnlyOptions());
 
     // get common codegen opts.
     RelatedOptions codeGenOpts = getCodeGenOpts(allTables);
@@ -676,7 +678,7 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
       System.out.println("At minimum, you must specify --connect");
     } else {
       System.out.println(
-          "At minimum, you must specify --connect and --table");
+        "At minimum, you must specify --connect and --table");
     }
 
     System.out.println(
@@ -819,6 +821,8 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
       applyInputFormatOptions(in, out);
       applyCodeGenOptions(in, out, allTables);
       applyHBaseOptions(in, out);
+      applyHCatOptions(in, out);
+
     } catch (NumberFormatException nfe) {
       throw new InvalidOptionsException("Error: expected numeric argument.\n"
           + "Try --help for usage.");
@@ -892,7 +896,12 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
         != SqoopOptions.IncrementalMode.None && options.isValidationEnabled()) {
       throw new InvalidOptionsException("Validation is not supported for "
         + "incremental imports but single table only.");
-    }
+    } else if ((options.getTargetDir() != null
+      || options.getWarehouseDir() != null)
+      && options.getHCatTableName() != null) {
+      throw new InvalidOptionsException("--hcatalog-table cannot be used "
+        + " --warehouse-dir or --target-dir options");
+     }
   }
 
   /**
@@ -936,6 +945,7 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
     validateOutputFormatOptions(options);
     validateHBaseOptions(options);
     validateHiveOptions(options);
+    validateHCatalogOptions(options);
   }
 }
 
index 0a41408..b5710e0 100644 (file)
@@ -117,7 +117,7 @@ public class ExportStressTest extends Configured implements Tool {
     options.setNumMappers(4);
     options.setLinesTerminatedBy('\n');
     options.setFieldsTerminatedBy(',');
-    options.setExplicitDelims(true);
+    options.setExplicitOutputDelims(true);
 
     SqoopTool exportTool = new ExportTool();
     Sqoop sqoop = new Sqoop(exportTool, getConf(), options);
index 06f7122..7e361d2 100644 (file)
@@ -18,6 +18,9 @@
 
 package com.cloudera.sqoop;
 
+import org.apache.sqoop.hcat.HCatalogExportTest;
+import org.apache.sqoop.hcat.HCatalogImportTest;
+
 import com.cloudera.sqoop.hbase.HBaseImportTest;
 import com.cloudera.sqoop.hbase.HBaseQueryImportTest;
 import com.cloudera.sqoop.hbase.HBaseUtilTest;
@@ -71,6 +74,10 @@ public final class ThirdPartyTests extends TestCase {
     suite.addTestSuite(HBaseQueryImportTest.class);
     suite.addTestSuite(HBaseUtilTest.class);
 
+    // HCatalog
+    suite.addTestSuite(HCatalogImportTest.class);
+    suite.addTestSuite(HCatalogExportTest.class);
+
     return suite;
   }
 
index 462ccf1..9c47bad 100644 (file)
@@ -51,6 +51,16 @@ public class TestHiveImport extends ImportJobTestCase {
   public static final Log LOG = LogFactory.getLog(
       TestHiveImport.class.getName());
 
+  public void setUp() {
+    super.setUp();
+    HiveImport.setTestMode(true);
+  }
+
+  public void tearDown() {
+    super.tearDown();
+    HiveImport.setTestMode(false);
+  }
+
   /**
    * Sets the expected number of columns in the table being manipulated
    * by the test. Under the hood, this sets the expected column names
index cf41b96..d6afbc8 100644 (file)
@@ -413,7 +413,7 @@ public abstract class BaseSqoopTestCase extends TestCase {
   protected void removeTableDir() {
     File tableDirFile = new File(getTablePath().toString());
     if (tableDirFile.exists()) {
-      // Remove the director where the table will be imported to,
+      // Remove the directory where the table will be imported to,
       // prior to running the MapReduce job.
       if (!DirUtil.deleteDir(tableDirFile)) {
         LOG.warn("Could not delete table directory: "
index e13f3df..4421f0c 100644 (file)
@@ -26,7 +26,6 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
 
-import com.cloudera.sqoop.SqoopOptions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -34,6 +33,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.junit.Before;
 
 import com.cloudera.sqoop.Sqoop;
+import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.mapreduce.ExportOutputFormat;
 import com.cloudera.sqoop.tool.ExportTool;
 
@@ -113,7 +113,7 @@ public abstract class ExportJobTestCase extends BaseSqoopTestCase {
         }
       }
     }
-
+    boolean isHCatJob = false;
     // The sqoop-specific additional args are then added.
     if (null != additionalArgv) {
       boolean prevIsFlag = false;
@@ -126,6 +126,9 @@ public abstract class ExportJobTestCase extends BaseSqoopTestCase {
           continue;
         } else {
           // normal argument.
+          if (!isHCatJob && arg.equals("--hcatalog-table")) {
+            isHCatJob = true;
+          }
           args.add(arg);
         }
       }
@@ -135,8 +138,11 @@ public abstract class ExportJobTestCase extends BaseSqoopTestCase {
       args.add("--table");
       args.add(getTableName());
     }
-    args.add("--export-dir");
-    args.add(getTablePath().toString());
+    // Only add export-dir if hcatalog-table is not there in additional argv
+    if (!isHCatJob) {
+      args.add("--export-dir");
+      args.add(getTablePath().toString());
+    }
     args.add("--connect");
     args.add(getConnectString());
     args.add("--fields-terminated-by");
diff --git a/src/test/org/apache/sqoop/hcat/HCatalogExportTest.java b/src/test/org/apache/sqoop/hcat/HCatalogExportTest.java
new file mode 100644 (file)
index 0000000..77bafcc
--- /dev/null
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hcat;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.sqoop.hcat.HCatalogTestUtils.ColumnGenerator;
+import org.apache.sqoop.hcat.HCatalogTestUtils.CreateMode;
+import org.apache.sqoop.hcat.HCatalogTestUtils.KeyType;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
+import org.junit.Before;
+
+import com.cloudera.sqoop.testutil.ExportJobTestCase;
+
+/**
+ * Test that we can export HCatalog tables into databases.
+ */
+public class HCatalogExportTest extends ExportJobTestCase {
+  private static final Log LOG =
+    LogFactory.getLog(HCatalogExportTest.class);
+  private HCatalogTestUtils utils = HCatalogTestUtils.instance();
+  @Before
+  @Override
+  public void setUp() {
+    super.setUp();
+    try {
+      utils.initUtils();
+    } catch (Exception e) {
+      throw new RuntimeException("Error initializing HCatTestUtilis", e);
+    }
+  }
+  /**
+   * @return an argv for the CodeGenTool to use when creating tables to export.
+   */
+  protected String[] getCodeGenArgv(String... extraArgs) {
+    List<String> codeGenArgv = new ArrayList<String>();
+
+    if (null != extraArgs) {
+      for (String arg : extraArgs) {
+        codeGenArgv.add(arg);
+      }
+    }
+
+    codeGenArgv.add("--table");
+    codeGenArgv.add(getTableName());
+    codeGenArgv.add("--connect");
+    codeGenArgv.add(getConnectString());
+    codeGenArgv.add("--hcatalog-table");
+    codeGenArgv.add(getTableName());
+
+    return codeGenArgv.toArray(new String[0]);
+  }
+
+  /**
+   * Verify that for the max and min values of the 'id' column, the values for a
+   * given column meet the expected values.
+   */
+  protected void assertColMinAndMax(String colName, ColumnGenerator generator)
+    throws SQLException {
+    Connection conn = getConnection();
+    int minId = getMinRowId(conn);
+    int maxId = getMaxRowId(conn);
+    String table = getTableName();
+    LOG.info("Checking min/max for column " + colName + " with type "
+      + SqoopHCatUtilities.sqlTypeString(generator.getSqlType()));
+
+    Object expectedMin = generator.getDBValue(minId);
+    Object expectedMax = generator.getDBValue(maxId);
+
+    utils.assertSqlColValForRowId(conn, table, minId, colName, expectedMin);
+    utils.assertSqlColValForRowId(conn, table, maxId, colName, expectedMax);
+  }
+
+  private void runHCatExport(List<String> addlArgsArray,
+    final int totalRecords, String table,
+    ColumnGenerator[] cols) throws Exception {
+    utils.createHCatTable(CreateMode.CREATE_AND_LOAD,
+      totalRecords, table, cols);
+    utils.createSqlTable(getConnection(), true, totalRecords, table, cols);
+    Map<String, String> addlArgsMap = utils.getAddlTestArgs();
+    addlArgsArray.add("--verbose");
+    addlArgsArray.add("-m");
+    addlArgsArray.add("1");
+    addlArgsArray.add("--hcatalog-table");
+    addlArgsArray.add(table);
+    String[] argv = {};
+
+    if (addlArgsMap.containsKey("-libjars")) {
+      argv = new String[2];
+      argv[0] = "-libjars";
+      argv[1] = addlArgsMap.get("-libjars");
+    }
+    for (String k : addlArgsMap.keySet()) {
+      if (!k.equals("-libjars")) {
+        addlArgsArray.add(k);
+        addlArgsArray.add(addlArgsMap.get(k));
+      }
+    }
+    String[] exportArgs = getArgv(true, 10, 10, newStrArray(argv,
+      addlArgsArray.toArray(new String[0])));
+    LOG.debug("Export args = " + Arrays.toString(exportArgs));
+    SqoopHCatUtilities.instance().setConfigured(false);
+    runExport(exportArgs);
+    verifyExport(totalRecords);
+    for (int i = 0; i < cols.length; i++) {
+      assertColMinAndMax(HCatalogTestUtils.forIdx(i), cols[i]);
+    }
+  }
+
+  public void testIntTypes() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "boolean", Types.BOOLEAN, HCatFieldSchema.Type.BOOLEAN,
+        Boolean.TRUE, Boolean.TRUE, KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "tinyint", Types.INTEGER, HCatFieldSchema.Type.INT, 10,
+        10, KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(2),
+        "smallint", Types.INTEGER, HCatFieldSchema.Type.INT, 100,
+        100, KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(3),
+        "int", Types.INTEGER, HCatFieldSchema.Type.INT, 1000,
+        1000, KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(4),
+        "bigint", Types.BIGINT, HCatFieldSchema.Type.BIGINT, 10000L,
+        10000L, KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+  }
+
+  public void testFloatTypes() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "float", Types.FLOAT, HCatFieldSchema.Type.FLOAT, 10.0F,
+        10.F, KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "real", Types.FLOAT, HCatFieldSchema.Type.FLOAT, 20.0F,
+        20.0F, KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(2),
+        "double", Types.DOUBLE, HCatFieldSchema.Type.DOUBLE, 30.0D,
+        30.0D, KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+  }
+
+  public void testNumberTypes() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "numeric(18,2)", Types.NUMERIC, HCatFieldSchema.Type.STRING, "1000",
+        new BigDecimal("1000"), KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "decimal(18,2)", Types.DECIMAL, HCatFieldSchema.Type.STRING, "2000",
+        new BigDecimal("2000"), KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+  }
+
+  public void testDateTypes() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "date", Types.DATE, HCatFieldSchema.Type.STRING, "2013-12-31",
+        new Date(113, 11, 31), KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "time", Types.TIME, HCatFieldSchema.Type.STRING, "10:11:12",
+        new Time(10, 11, 12), KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(2),
+        "timestamp", Types.TIMESTAMP, HCatFieldSchema.Type.STRING,
+        "2013-12-31 10:11:12", new Timestamp(113, 11, 31, 10, 11, 12, 0),
+        KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+  }
+
+  public void testDateTypesToBigInt() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    long offset = TimeZone.getDefault().getRawOffset();
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "date", Types.DATE, HCatFieldSchema.Type.BIGINT, 0 - offset,
+        new Date(70, 0, 1), KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "time", Types.TIME, HCatFieldSchema.Type.BIGINT, 36672000L - offset,
+        new Time(10, 11, 12), KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(2),
+        "timestamp", Types.TIMESTAMP, HCatFieldSchema.Type.BIGINT,
+        36672000L - offset, new Timestamp(70, 0, 1, 10, 11, 12, 0),
+        KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("--map-column-hive");
+    addlArgsArray.add("COL0=bigint,COL1=bigint,COL2=bigint");
+    runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+  }
+
+  public void testStringTypes() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "char(10)", Types.CHAR, HCatFieldSchema.Type.STRING, "string to test",
+        "string to test", KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "longvarchar", Types.LONGVARCHAR, HCatFieldSchema.Type.STRING,
+        "string to test", "string to test", KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+  }
+
+
+  public void testBinaryTypes() throws Exception {
+    ByteBuffer bb = ByteBuffer.wrap(new byte[] { 0, 1, 2 });
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "binary(10)", Types.BINARY, HCatFieldSchema.Type.BINARY,
+        bb.array(), bb.array(), KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "varbinary(10)", Types.BINARY, HCatFieldSchema.Type.BINARY,
+        bb.array(), bb.array(), KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+  }
+
+  public void testColumnProjection() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "1", null, KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("--columns");
+    addlArgsArray.add("ID,MSG");
+    runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+
+  }
+  public void testStaticPartitioning() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "1", "1", KeyType.STATIC_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("--hive-partition-key");
+    addlArgsArray.add("col0");
+    addlArgsArray.add("--hive-partition-value");
+    addlArgsArray.add("1");
+
+    runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+  }
+
+  public void testDynamicPartitioning() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+        HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "1", "1", KeyType.DYNAMIC_KEY),
+    };
+
+    List<String> addlArgsArray = new ArrayList<String>();
+    runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+  }
+
+  public void testStaicAndDynamicPartitioning() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "1", "1", KeyType.STATIC_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "2", "2", KeyType.DYNAMIC_KEY),
+    };
+
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("--hive-partition-key");
+    addlArgsArray.add("col0");
+    addlArgsArray.add("--hive-partition-value");
+    addlArgsArray.add("1");
+    runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+  }
+
+  /**
+   * Test other file formats.
+   */
+  public void testSequenceFile() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+        HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+            "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "1",
+            "1", KeyType.STATIC_KEY),
+        HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+            "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "2",
+            "2", KeyType.DYNAMIC_KEY), };
+
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("--hive-partition-key");
+    addlArgsArray.add("col0");
+    addlArgsArray.add("--hive-partition-value");
+    addlArgsArray.add("1");
+    utils.setStorageInfo(HCatalogTestUtils.STORED_AS_SEQFILE);
+    runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+  }
+
+  public void testTextFile() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+        HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+            "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "1",
+            "1", KeyType.STATIC_KEY),
+        HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+            "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "2",
+            "2", KeyType.DYNAMIC_KEY), };
+
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("--hive-partition-key");
+    addlArgsArray.add("col0");
+    addlArgsArray.add("--hive-partition-value");
+    addlArgsArray.add("1");
+    utils.setStorageInfo(HCatalogTestUtils.STORED_AS_TEXT);
+    runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+  }
+}
diff --git a/src/test/org/apache/sqoop/hcat/HCatalogImportTest.java b/src/test/org/apache/sqoop/hcat/HCatalogImportTest.java
new file mode 100644 (file)
index 0000000..293015e
--- /dev/null
@@ -0,0 +1,712 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hcat;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.sqoop.hcat.HCatalogTestUtils.ColumnGenerator;
+import org.apache.sqoop.hcat.HCatalogTestUtils.CreateMode;
+import org.apache.sqoop.hcat.HCatalogTestUtils.KeyType;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
+import org.junit.Before;
+
+import com.cloudera.sqoop.Sqoop;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.testutil.ImportJobTestCase;
+import com.cloudera.sqoop.tool.ImportTool;
+import com.cloudera.sqoop.tool.SqoopTool;
+
+/**
+ * Test that we can export HCatalog tables into databases.
+ */
+public class HCatalogImportTest extends ImportJobTestCase {
+  private static final Log LOG =
+    LogFactory.getLog(HCatalogImportTest.class);
+  private final HCatalogTestUtils utils = HCatalogTestUtils.instance();
+  private List<String> extraTestArgs = null;
+  private List<String> configParams = null;
+
+  @Override
+  @Before
+  public void setUp() {
+    super.setUp();
+    try {
+      utils.initUtils();
+      extraTestArgs = new ArrayList<String>();
+      configParams = new ArrayList<String>();
+    } catch (Exception e) {
+      throw new RuntimeException("Error initializing HCatTestUtilis", e);
+    }
+  }
+
+  /**
+   * @return an argv for the CodeGenTool to use when creating tables to export.
+   */
+  protected String[] getCodeGenArgv(String... extraArgs) {
+    List<String> codeGenArgv = new ArrayList<String>();
+
+    if (null != extraArgs) {
+      for (String arg : extraArgs) {
+        codeGenArgv.add(arg);
+      }
+    }
+
+    codeGenArgv.add("--table");
+    codeGenArgv.add(getTableName());
+    codeGenArgv.add("--connect");
+    codeGenArgv.add(getConnectString());
+    codeGenArgv.add("--hcatalog-table");
+    codeGenArgv.add(getTableName());
+
+    return codeGenArgv.toArray(new String[0]);
+  }
+
+  private void setExtraArgs(List<String> args) {
+    extraTestArgs.clear();
+    if (args != null && args.size() > 0) {
+      extraTestArgs.addAll(args);
+    }
+  }
+
+  private List<String> getConfigParams() {
+    return configParams;
+  }
+
+  private void setConfigParams(List<String> params) {
+    configParams.clear();
+    if (params != null && params.size() > 0) {
+      configParams.addAll(params);
+    }
+  }
+  @Override
+  protected List<String> getExtraArgs(Configuration conf) {
+    List<String> addlArgsArray = new ArrayList<String>();
+    if (extraTestArgs != null && extraTestArgs.size() > 0) {
+      addlArgsArray.addAll(extraTestArgs);
+    }
+    Map<String, String> addlArgsMap = utils.getAddlTestArgs();
+    String[] argv = {};
+
+    if (addlArgsMap.containsKey("-libjars")) {
+      argv = new String[2];
+      argv[0] = "-libjars";
+      argv[1] = addlArgsMap.get("-libjars");
+    }
+    addlArgsArray.add("-m");
+    addlArgsArray.add("1");
+    addlArgsArray.add("--hcatalog-table");
+    addlArgsArray.add(getTableName());
+    for (String k : addlArgsMap.keySet()) {
+      if (!k.equals("-libjars")) {
+        addlArgsArray.add(k);
+        addlArgsArray.add(addlArgsMap.get(k));
+      }
+    }
+    return addlArgsArray;
+  }
+
+  @Override
+  protected String[] getArgv(boolean includeHadoopFlags, String[] colNames,
+    Configuration conf) {
+    if (null == colNames) {
+      colNames = getColNames();
+    }
+    String columnsString = "";
+    String splitByCol = null;
+    if (colNames != null) {
+      splitByCol = colNames[0];
+      for (String col : colNames) {
+        columnsString += col + ",";
+      }
+    }
+    ArrayList<String> args = new ArrayList<String>();
+
+    if (includeHadoopFlags) {
+      CommonArgs.addHadoopFlags(args);
+    }
+    args.addAll(getConfigParams());
+    args.add("--table");
+    args.add(getTableName());
+    if (colNames != null) {
+      args.add("--columns");
+      args.add(columnsString);
+      args.add("--split-by");
+      args.add(splitByCol);
+    }
+    args.add("--hcatalog-table");
+    args.add(getTableName());
+    args.add("--connect");
+    args.add(getConnectString());
+    args.addAll(getExtraArgs(conf));
+
+    return args.toArray(new String[0]);
+  }
+
+  private void validateHCatRecords(final List<HCatRecord> recs,
+    final HCatSchema schema, int expectedCount,
+    ColumnGenerator... cols) throws IOException {
+    if (recs.size() != expectedCount) {
+      fail("Expected records = " + expectedCount
+        + ", actual = " + recs.size());
+      return;
+    }
+    schema.getFieldNames();
+    Collections.sort(recs, new Comparator<HCatRecord>()
+    {
+      @Override
+      public int compare(HCatRecord hr1, HCatRecord hr2) {
+        try {
+          return hr1.getInteger("id", schema)
+            - hr2.getInteger("id", schema);
+        } catch (Exception e) {
+          LOG.warn("Exception caught while sorting hcat records " + e);
+        }
+        return 0;
+      }
+    });
+
+    Object expectedVal = null;
+    Object actualVal = null;
+    for (int i = 0; i < recs.size(); ++i) {
+      HCatRecord rec = recs.get(i);
+      expectedVal = i;
+      actualVal = rec.get("id", schema);
+      LOG.info("Validating field: id (expected = "
+        + expectedVal + ", actual = " + actualVal + ")");
+      HCatalogTestUtils.assertEquals(expectedVal, actualVal);
+      expectedVal = "textfield" + i;
+      actualVal = rec.get("msg", schema);
+      LOG.info("Validating field: msg (expected = "
+        + expectedVal + ", actual = " + actualVal + ")");
+      HCatalogTestUtils.assertEquals(rec.get("msg", schema), "textfield" + i);
+      for (ColumnGenerator col : cols) {
+        String name = col.getName().toLowerCase();
+        expectedVal = col.getHCatValue(i);
+        actualVal = rec.get(name, schema);
+        LOG.info("Validating field: " + name + " (expected = "
+          + expectedVal + ", actual = " + actualVal + ")");
+        HCatalogTestUtils.assertEquals(expectedVal, actualVal);
+      }
+    }
+  }
+
+  protected void runImport(SqoopTool tool, String[] argv) throws IOException {
+    // run the tool through the normal entry-point.
+    int ret;
+    try {
+      Configuration conf = getConf();
+      SqoopOptions opts = getSqoopOptions(conf);
+      Sqoop sqoop = new Sqoop(tool, conf, opts);
+      ret = Sqoop.runSqoop(sqoop, argv);
+    } catch (Exception e) {
+      LOG.error("Got exception running import: " + e.toString());
+      e.printStackTrace();
+      ret = 1;
+    }
+    if (0 != ret) {
+      throw new IOException("Import failure; return status " + ret);
+    }
+  }
+
+  private void runHCatImport(List<String> addlArgsArray,
+    int totalRecords, String table, ColumnGenerator[] cols,
+    String[] cNames) throws Exception {
+    runHCatImport(addlArgsArray, totalRecords, table, cols, cNames, false);
+  }
+
+  private void runHCatImport(List<String> addlArgsArray,
+    int totalRecords, String table, ColumnGenerator[] cols,
+    String[] cNames, boolean dontCreate) throws Exception {
+    CreateMode mode = CreateMode.CREATE;
+    if (dontCreate) {
+      mode = CreateMode.NO_CREATION;
+    }
+    HCatSchema tblSchema =
+      utils.createHCatTable(mode, totalRecords, table, cols);
+    utils.createSqlTable(getConnection(), false, totalRecords, table, cols);
+    Map<String, String> addlArgsMap = utils.getAddlTestArgs();
+    String[] argv = {};
+    addlArgsArray.add("-m");
+    addlArgsArray.add("1");
+    addlArgsArray.add("--hcatalog-table");
+    addlArgsArray.add(table);
+    if (addlArgsMap.containsKey("-libjars")) {
+      argv = new String[2];
+      argv[0] = "-libjars";
+      argv[1] = addlArgsMap.get("-libjars");
+    }
+    for (String k : addlArgsMap.keySet()) {
+      if (!k.equals("-libjars")) {
+        addlArgsArray.add(k);
+        addlArgsArray.add(addlArgsMap.get(k));
+      }
+    }
+    String[] colNames = null;
+    if (cNames != null) {
+      colNames = cNames;
+    } else {
+      colNames = new String[2 + cols.length];
+      colNames[0] = "ID";
+      colNames[1] = "MSG";
+      for (int i = 0; i < cols.length; ++i) {
+        colNames[2 + i] = cols[i].getName().toUpperCase();
+      }
+    }
+    String[] importArgs = getArgv(true, colNames, new Configuration());
+    LOG.debug("Import args = " + Arrays.toString(importArgs));
+    SqoopHCatUtilities.instance().setConfigured(false);
+    runImport(new ImportTool(), importArgs);
+    List<HCatRecord> recs = utils.readHCatRecords(null, table, null);
+    LOG.debug("HCat records ");
+    LOG.debug(utils.hCatRecordDump(recs, tblSchema));
+    validateHCatRecords(recs, tblSchema, 10, cols);
+  }
+
+  public void testIntTypes() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "boolean", Types.BOOLEAN, HCatFieldSchema.Type.BOOLEAN,
+        Boolean.TRUE, Boolean.TRUE, KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "tinyint", Types.INTEGER, HCatFieldSchema.Type.INT, 10,
+        10, KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(2),
+        "smallint", Types.INTEGER, HCatFieldSchema.Type.INT, 100,
+        100, KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(3),
+        "int", Types.INTEGER, HCatFieldSchema.Type.INT, 1000,
+        1000, KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(4),
+        "bigint", Types.BIGINT, HCatFieldSchema.Type.BIGINT, 10000L,
+        10000L, KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    setExtraArgs(addlArgsArray);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+  }
+
+  public void testFloatTypes() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "float", Types.FLOAT, HCatFieldSchema.Type.FLOAT, 10.0F,
+        10.F, KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "real", Types.FLOAT, HCatFieldSchema.Type.FLOAT, 20.0F,
+        20.0F, KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(2),
+        "double", Types.DOUBLE, HCatFieldSchema.Type.DOUBLE, 30.0D,
+        30.0D, KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    setExtraArgs(addlArgsArray);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+  }
+
+  public void testNumberTypes() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "numeric(18,2)", Types.NUMERIC, HCatFieldSchema.Type.STRING, "1000",
+        new BigDecimal("1000"), KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "decimal(18,2)", Types.DECIMAL, HCatFieldSchema.Type.STRING, "2000",
+        new BigDecimal("2000"), KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    setExtraArgs(addlArgsArray);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+  }
+
+  public void testDateTypes() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "date", Types.DATE, HCatFieldSchema.Type.STRING, "2013-12-31",
+        new Date(113, 11, 31), KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "time", Types.TIME, HCatFieldSchema.Type.STRING, "10:11:12",
+        new Time(10, 11, 12), KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(2),
+        "timestamp", Types.TIMESTAMP, HCatFieldSchema.Type.STRING,
+        "2013-12-31 10:11:12.0", new Timestamp(113, 11, 31, 10, 11, 12, 0),
+        KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    setExtraArgs(addlArgsArray);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+  }
+
+  public void testDateTypesToBigInt() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    long offset = TimeZone.getDefault().getRawOffset();
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "date", Types.DATE, HCatFieldSchema.Type.BIGINT, 0 - offset,
+        new Date(70, 0, 1), KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "time", Types.TIME, HCatFieldSchema.Type.BIGINT, 36672000L - offset,
+        new Time(10, 11, 12), KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(2),
+        "timestamp", Types.TIMESTAMP, HCatFieldSchema.Type.BIGINT,
+        36672000L - offset, new Timestamp(70, 0, 1, 10, 11, 12, 0),
+        KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("--map-column-hive");
+    addlArgsArray.add("COL0=bigint,COL1=bigint,COL2=bigint");
+    setExtraArgs(addlArgsArray);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+  }
+
+  public void testStringTypes() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "char(10)", Types.CHAR, HCatFieldSchema.Type.STRING, "string to test",
+        "string to test", KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "longvarchar", Types.LONGVARCHAR, HCatFieldSchema.Type.STRING,
+        "string to test", "string to test", KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    setExtraArgs(addlArgsArray);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+  }
+
+  public void testBinaryTypes() throws Exception {
+    ByteBuffer bb = ByteBuffer.wrap(new byte[] { 0, 1, 2 });
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "binary(10)", Types.BINARY, HCatFieldSchema.Type.BINARY,
+        bb.array(), bb.array(), KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "longvarbinary", Types.BINARY, HCatFieldSchema.Type.BINARY,
+        bb.array(), bb.array(), KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    setExtraArgs(addlArgsArray);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+  }
+
+  public void testColumnProjection() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        null, null, KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    List<String> cfgParams = new ArrayList<String>();
+    cfgParams.add("-D");
+    cfgParams.add(SqoopHCatUtilities.DEBUG_HCAT_IMPORT_MAPPER_PROP
+      + "=true");
+    setConfigParams(cfgParams);
+    String[] colNames = new String[] { "ID", "MSG" };
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, colNames);
+  }
+
+  public void testColumnProjectionMissingPartKeys() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        null, null, KeyType.DYNAMIC_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    List<String> cfgParams = new ArrayList<String>();
+    cfgParams.add("-D");
+    cfgParams.add(SqoopHCatUtilities.DEBUG_HCAT_IMPORT_MAPPER_PROP
+      + "=true");
+    setConfigParams(cfgParams);
+    String[] colNames = new String[] { "ID", "MSG" };
+    try {
+      runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, colNames);
+      fail("Column projection with missing dynamic partition keys must fail");
+    } catch (Throwable t) {
+      LOG.info("Job fails as expected : " + t);
+      StringWriter sw = new StringWriter();
+      t.printStackTrace(new PrintWriter(sw));
+      LOG.info("Exception stack trace = " + sw);
+    }
+  }
+  public void testStaticPartitioning() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "1", "1", KeyType.STATIC_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("--hive-partition-key");
+    addlArgsArray.add("col0");
+    addlArgsArray.add("--hive-partition-value");
+    addlArgsArray.add("1");
+    setExtraArgs(addlArgsArray);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+  }
+
+  public void testDynamicPartitioning() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "1", "1", KeyType.DYNAMIC_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    setExtraArgs(addlArgsArray);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+  }
+
+  public void testStaicAndDynamicPartitioning() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "1", "1", KeyType.STATIC_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "2", "2", KeyType.DYNAMIC_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("--hive-partition-key");
+    addlArgsArray.add("col0");
+    addlArgsArray.add("--hive-partition-value");
+    addlArgsArray.add("1");
+    setExtraArgs(addlArgsArray);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+  }
+
+  /**
+   * Test other file formats.
+   */
+  public void testSequenceFile() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+        HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+            "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "1",
+            "1", KeyType.STATIC_KEY),
+        HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+            "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "2",
+            "2", KeyType.DYNAMIC_KEY), };
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("--hive-partition-key");
+    addlArgsArray.add("col0");
+    addlArgsArray.add("--hive-partition-value");
+    addlArgsArray.add("1");
+    setExtraArgs(addlArgsArray);
+    utils.setStorageInfo(HCatalogTestUtils.STORED_AS_SEQFILE);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+  }
+
+  public void testTextFile() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "1", "1", KeyType.STATIC_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "2", "2", KeyType.DYNAMIC_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("--hive-partition-key");
+    addlArgsArray.add("col0");
+    addlArgsArray.add("--hive-partition-value");
+    addlArgsArray.add("1");
+    setExtraArgs(addlArgsArray);
+    utils.setStorageInfo(HCatalogTestUtils.STORED_AS_TEXT);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+  }
+
+  public void testTableCreation() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "1", "1", KeyType.STATIC_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "2", "2", KeyType.DYNAMIC_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("--create-hcatalog-table");
+    setExtraArgs(addlArgsArray);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null, true);
+  }
+
+  public void testTableCreationWithPartition() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "1", "1", KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "2", "2", KeyType.STATIC_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("--hive-partition-key");
+    addlArgsArray.add("col1");
+    addlArgsArray.add("--hive-partition-value");
+    addlArgsArray.add("2");
+    addlArgsArray.add("--create-hcatalog-table");
+    setExtraArgs(addlArgsArray);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null, true);
+  }
+
+  public void testTableCreationWithStorageStanza() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "1", "1", KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "2", "2", KeyType.STATIC_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("--hive-partition-key");
+    addlArgsArray.add("col1");
+    addlArgsArray.add("--hive-partition-value");
+    addlArgsArray.add("2");
+    addlArgsArray.add("--create-hcatalog-table");
+    addlArgsArray.add("--hcatalog-storage-stanza");
+    addlArgsArray.add(HCatalogTestUtils.STORED_AS_TEXT);
+    setExtraArgs(addlArgsArray);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null, true);
+  }
+
+  public void testHiveDropDelims() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "Test", "\u0001\n\rTest", KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "Test2", "\u0001\r\nTest2", KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("--hive-drop-import-delims");
+    setExtraArgs(addlArgsArray);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+  }
+
+  public void testHiveDelimsReplacement() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "^^^Test", "\u0001\n\rTest", KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING,
+        "^^^Test2", "\u0001\r\nTest2", KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("--hive-delims-replacement");
+    addlArgsArray.add("^");
+    setExtraArgs(addlArgsArray);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+  }
+
+  public void testDynamicKeyInMiddle() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "1",
+        "1", KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "2",
+        "2", KeyType.DYNAMIC_KEY), };
+    List<String> addlArgsArray = new ArrayList<String>();
+    setExtraArgs(addlArgsArray);
+    utils.setStorageInfo(HCatalogTestUtils.STORED_AS_SEQFILE);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+  }
+
+  public void testCreateTableWithPreExistingTable() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "1",
+        "1", KeyType.NOT_A_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, "2",
+        "2", KeyType.DYNAMIC_KEY), };
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("--create-hcatalog-table");
+    setExtraArgs(addlArgsArray);
+    try {
+      // Precreate table
+      utils.createHCatTable(CreateMode.CREATE, TOTAL_RECORDS, table, cols);
+      runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null, true);
+      fail("HCatalog job with --create-hcatalog-table and pre-existing"
+        + " table should fail");
+    } catch (Exception e) {
+      LOG.debug("Caught expected exception while running "
+        + " create-hcatalog-table with pre-existing table test", e);
+    }
+  }
+}
diff --git a/src/test/org/apache/sqoop/hcat/HCatalogTestUtils.java b/src/test/org/apache/sqoop/hcat/HCatalogTestUtils.java
new file mode 100644 (file)
index 0000000..ddae5f5
--- /dev/null
@@ -0,0 +1,855 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hcat;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.sqoop.config.ConfigurationConstants;
+import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
+import org.junit.Assert;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
+import com.cloudera.sqoop.testutil.CommonArgs;
+
+/**
+ * HCatalog common test utilities.
+ *
+ */
+public final class HCatalogTestUtils {
+  protected Configuration conf;
+  private static List<HCatRecord> recsToLoad = new ArrayList<HCatRecord>();
+  private static List<HCatRecord> recsRead = new ArrayList<HCatRecord>();
+  private static final Log LOG = LogFactory.getLog(HCatalogTestUtils.class);
+  private FileSystem fs;
+  private final SqoopHCatUtilities utils = SqoopHCatUtilities.instance();
+  private static final double DELTAVAL = 1e-10;
+  public static final String SQOOP_HCATALOG_TEST_ARGS =
+    "sqoop.hcatalog.test.args";
+  private final boolean initialized = false;
+  private static String storageInfo = null;
+  public static final String STORED_AS_RCFILE = "stored as\n\trcfile\n";
+  public static final String STORED_AS_SEQFILE = "stored as\n\tsequencefile\n";
+  public static final String STORED_AS_TEXT = "stored as\n\ttextfile\n";
+
+  private HCatalogTestUtils() {
+  }
+
+  private static final class Holder {
+    @SuppressWarnings("synthetic-access")
+    private static final HCatalogTestUtils INSTANCE = new HCatalogTestUtils();
+
+    private Holder() {
+    }
+  }
+
+  @SuppressWarnings("synthetic-access")
+  public static HCatalogTestUtils instance() {
+    return Holder.INSTANCE;
+  }
+
+  public void initUtils() throws IOException, MetaException {
+    if (initialized) {
+      return;
+    }
+    conf = new Configuration();
+    if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
+      conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
+    }
+    fs = FileSystem.get(conf);
+    fs.initialize(fs.getWorkingDirectory().toUri(), conf);
+    storageInfo = null;
+    SqoopHCatUtilities.setTestMode(true);
+  }
+
+  public static String getStorageInfo() {
+    if (null != storageInfo && storageInfo.length() > 0) {
+      return storageInfo;
+    } else {
+      return STORED_AS_RCFILE;
+    }
+  }
+
+  public void setStorageInfo(String info) {
+    storageInfo = info;
+  }
+
+  private static String getDropTableCmd(final String dbName,
+    final String tableName) {
+    return "DROP TABLE IF EXISTS " + dbName.toLowerCase() + "."
+      + tableName.toLowerCase();
+  }
+
+  private static String getHCatCreateTableCmd(String dbName,
+    String tableName, List<HCatFieldSchema> tableCols,
+    List<HCatFieldSchema> partKeys) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("create table ").append(dbName.toLowerCase()).append('.');
+    sb.append(tableName.toLowerCase()).append(" (\n\t");
+    for (int i = 0; i < tableCols.size(); ++i) {
+      HCatFieldSchema hfs = tableCols.get(i);
+      if (i > 0) {
+        sb.append(",\n\t");
+      }
+      sb.append(hfs.getName().toLowerCase());
+      sb.append(' ').append(hfs.getTypeString());
+    }
+    sb.append(")\n");
+    if (partKeys != null && partKeys.size() > 0) {
+      sb.append("partitioned by (\n\t");
+      for (int i = 0; i < partKeys.size(); ++i) {
+        HCatFieldSchema hfs = partKeys.get(i);
+        if (i > 0) {
+          sb.append("\n\t,");
+        }
+        sb.append(hfs.getName().toLowerCase());
+        sb.append(' ').append(hfs.getTypeString());
+      }
+      sb.append(")\n");
+    }
+    sb.append(getStorageInfo());
+    LOG.info("Create table command : " + sb);
+    return sb.toString();
+  }
+
+  /**
+   * The record writer mapper for HCatalog tables that writes records from an in
+   * memory list.
+   */
+  public void createHCatTableUsingSchema(String dbName,
+    String tableName, List<HCatFieldSchema> tableCols,
+    List<HCatFieldSchema> partKeys)
+    throws Exception {
+
+    String databaseName = dbName == null
+      ? SqoopHCatUtilities.DEFHCATDB : dbName;
+    LOG.info("Dropping HCatalog table if it exists " + databaseName
+      + '.' + tableName);
+    String dropCmd = getDropTableCmd(databaseName, tableName);
+
+    try {
+      utils.launchHCatCli(dropCmd);
+    } catch (Exception e) {
+      LOG.debug("Drop hcatalog table exception : " + e);
+      LOG.info("Unable to drop table." + dbName + "."
+        + tableName + ".   Assuming it did not exist");
+    }
+    LOG.info("Creating HCatalog table if it exists " + databaseName
+      + '.' + tableName);
+    String createCmd = getHCatCreateTableCmd(databaseName, tableName,
+      tableCols, partKeys);
+    utils.launchHCatCli(createCmd);
+    LOG.info("Created HCatalog table " + dbName + "." + tableName);
+  }
+
+  /**
+   * The record writer mapper for HCatalog tables that writes records from an in
+   * memory list.
+   */
+  public static class HCatWriterMapper extends
+    Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+    private static int writtenRecordCount = 0;
+
+    public static int getWrittenRecordCount() {
+      return writtenRecordCount;
+    }
+
+    public static void setWrittenRecordCount(int count) {
+      HCatWriterMapper.writtenRecordCount = count;
+    }
+
+    @Override
+    public void map(LongWritable key, Text value,
+      Context context)
+      throws IOException, InterruptedException {
+      try {
+        HCatRecord rec = recsToLoad.get(writtenRecordCount);
+        context.write(null, rec);
+        writtenRecordCount++;
+      } catch (Exception e) {
+        if (LOG.isDebugEnabled()) {
+          e.printStackTrace(System.err);
+        }
+        throw new IOException(e);
+      }
+    }
+  }
+
+  /**
+   * The record reader mapper for HCatalog tables that reads records into an in
+   * memory list.
+   */
+  public static class HCatReaderMapper extends
+    Mapper<WritableComparable, HCatRecord, BytesWritable, Text> {
+
+    private static int readRecordCount = 0; // test will be in local mode
+
+    public static int getReadRecordCount() {
+      return readRecordCount;
+    }
+
+    public static void setReadRecordCount(int count) {
+      HCatReaderMapper.readRecordCount = count;
+    }
+
+    @Override
+    public void map(WritableComparable key, HCatRecord value,
+      Context context) throws IOException, InterruptedException {
+      try {
+        recsRead.add(value);
+        readRecordCount++;
+      } catch (Exception e) {
+        if (LOG.isDebugEnabled()) {
+          e.printStackTrace(System.err);
+        }
+        throw new IOException(e);
+      }
+    }
+  }
+
+  private void createInputFile(Path path, int rowCount)
+    throws IOException {
+    if (fs.exists(path)) {
+      fs.delete(path, true);
+    }
+    FSDataOutputStream os = fs.create(path);
+    for (int i = 0; i < rowCount; i++) {
+      String s = i + "\n";
+      os.writeChars(s);
+    }
+    os.close();
+  }
+
+  public List<HCatRecord> loadHCatTable(String dbName,
+    String tableName, Map<String, String> partKeyMap,
+    HCatSchema tblSchema, List<HCatRecord> records)
+    throws Exception {
+
+    Job job = new Job(conf, "HCat load job");
+
+    job.setJarByClass(this.getClass());
+    job.setMapperClass(HCatWriterMapper.class);
+
+
+    // Just writ 10 lines to the file to drive the mapper
+    Path path = new Path(fs.getWorkingDirectory(),
+      "mapreduce/HCatTableIndexInput");
+
+    job.getConfiguration()
+      .setInt(ConfigurationConstants.PROP_MAPRED_MAP_TASKS, 1);
+    int writeCount = records.size();
+    recsToLoad.clear();
+    recsToLoad.addAll(records);
+    createInputFile(path, writeCount);
+    // input/output settings
+    HCatWriterMapper.setWrittenRecordCount(0);
+
+    FileInputFormat.setInputPaths(job, path);
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setOutputFormatClass(HCatOutputFormat.class);
+    OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName,
+      partKeyMap);
+
+    HCatOutputFormat.setOutput(job, outputJobInfo);
+    HCatOutputFormat.setSchema(job, tblSchema);
+    job.setMapOutputKeyClass(BytesWritable.class);
+    job.setMapOutputValueClass(DefaultHCatRecord.class);
+
+    job.setNumReduceTasks(0);
+    SqoopHCatUtilities.addJars(job, new SqoopOptions());
+    boolean success = job.waitForCompletion(true);
+
+    if (!success) {
+      throw new IOException("Loading HCatalog table with test records failed");
+    }
+    utils.invokeOutputCommitterForLocalMode(job);
+    LOG.info("Loaded " + HCatWriterMapper.writtenRecordCount + " records");
+    return recsToLoad;
+  }
+
+  /**
+   * Run a local map reduce job to read records from HCatalog table.
+   * @param readCount
+   * @param filter
+   * @return
+   * @throws Exception
+   */
+  public List<HCatRecord> readHCatRecords(String dbName,
+    String tableName, String filter) throws Exception {
+
+    HCatReaderMapper.setReadRecordCount(0);
+    recsRead.clear();
+
+    // Configuration conf = new Configuration();
+    Job job = new Job(conf, "HCatalog reader job");
+    job.setJarByClass(this.getClass());
+    job.setMapperClass(HCatReaderMapper.class);
+    job.getConfiguration()
+      .setInt(ConfigurationConstants.PROP_MAPRED_MAP_TASKS, 1);
+    // input/output settings
+    job.setInputFormatClass(HCatInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+
+    HCatInputFormat.setInput(job, dbName, tableName).setFilter(filter);
+
+    job.setMapOutputKeyClass(BytesWritable.class);
+    job.setMapOutputValueClass(Text.class);
+
+    job.setNumReduceTasks(0);
+
+    Path path = new Path(fs.getWorkingDirectory(),
+      "mapreduce/HCatTableIndexOutput");
+    if (fs.exists(path)) {
+      fs.delete(path, true);
+    }
+
+    FileOutputFormat.setOutputPath(job, path);
+
+    job.waitForCompletion(true);
+    LOG.info("Read " + HCatReaderMapper.readRecordCount + " records");
+
+    return recsRead;
+  }
+
+  /**
+   * An enumeration type to hold the partition key type of the ColumnGenerator
+   * defined columns.
+   */
+  public enum KeyType {
+    NOT_A_KEY,
+    STATIC_KEY,
+    DYNAMIC_KEY
+  };
+
+  /**
+   * An enumeration type to hold the creation mode of the HCatalog table.
+   */
+  public enum CreateMode {
+    NO_CREATION,
+    CREATE,
+    CREATE_AND_LOAD,
+  };
+
+  /**
+   * When generating data for export tests, each column is generated according
+   * to a ColumnGenerator.
+   */
+  public interface ColumnGenerator {
+    /*
+     * The column name
+     */
+    String getName();
+
+    /**
+     * For a row with id rowNum, what should we write into that HCatalog column
+     * to export?
+     */
+    Object getHCatValue(int rowNum);
+
+    /**
+     * For a row with id rowNum, what should the database return for the given
+     * column's value?
+     */
+    Object getDBValue(int rowNum);
+
+    /** Return the column type to put in the CREATE TABLE statement. */
+    String getDBTypeString();
+
+    /** Return the SqlType for this column. */
+    int getSqlType();
+
+    /** Return the HCat type for this column. */
+    HCatFieldSchema.Type getHCatType();
+
+
+    /**
+     * If the field is a partition key, then whether is part of the static
+     * partitioning specification in imports or exports. Only one key can be a
+     * static partitioning key. After the first column marked as static, rest of
+     * the keys will be considered dynamic even if they are marked static.
+     */
+    KeyType getKeyType();
+  }
+
+  /**
+   * Return the column name for a column index. Each table contains two columns
+   * named 'id' and 'msg', and then an arbitrary number of additional columns
+   * defined by ColumnGenerators. These columns are referenced by idx 0, 1, 2
+   * and on.
+   * @param idx
+   *          the index of the ColumnGenerator in the array passed to
+   *          createTable().
+   * @return the name of the column
+   */
+  public static String forIdx(int idx) {
+    return "col" + idx;
+  }
+
+  public static ColumnGenerator colGenerator(final String name,
+    final String dbType, final int sqlType,
+    final HCatFieldSchema.Type hCatType, final Object hCatValue,
+    final Object dbValue, final KeyType keyType) {
+    return new ColumnGenerator() {
+
+      @Override
+      public String getName() {
+        return name;
+      }
+
+      @Override
+      public Object getDBValue(int rowNum) {
+        return dbValue;
+      }
+
+      @Override
+      public Object getHCatValue(int rowNum) {
+        return hCatValue;
+      }
+
+      @Override
+      public String getDBTypeString() {
+        return dbType;
+      }
+
+      @Override
+      public int getSqlType() {
+        return sqlType;
+      }
+
+      @Override
+      public HCatFieldSchema.Type getHCatType() {
+        return hCatType;
+      }
+
+      public KeyType getKeyType() {
+        return keyType;
+      }
+
+    };
+  }
+
+  public static void assertEquals(Object expectedVal,
+    Object actualVal) {
+
+    if (expectedVal != null && expectedVal instanceof byte[]) {
+      Assert
+        .assertArrayEquals((byte[]) expectedVal, (byte[]) actualVal);
+    } else {
+      if (expectedVal instanceof Float) {
+        if (actualVal instanceof Double) {
+          Assert.assertEquals(((Float) expectedVal).floatValue(),
+            ((Double) actualVal).doubleValue(), DELTAVAL);
+        } else {
+          Assert
+            .assertEquals("Got unexpected column value", expectedVal,
+              actualVal);
+        }
+      } else if (expectedVal instanceof Double) {
+        if (actualVal instanceof Float) {
+          Assert.assertEquals(((Double) expectedVal).doubleValue(),
+            ((Float) actualVal).doubleValue(), DELTAVAL);
+        } else {
+          Assert
+            .assertEquals("Got unexpected column value", expectedVal,
+              actualVal);
+        }
+      } else {
+        Assert
+          .assertEquals("Got unexpected column value", expectedVal,
+            actualVal);
+      }
+    }
+  }
+
+  /**
+   * Verify that on a given row, a column has a given value.
+   *
+   * @param id
+   *          the id column specifying the row to test.
+   */
+  public void assertSqlColValForRowId(Connection conn,
+    String table, int id, String colName,
+    Object expectedVal) throws SQLException {
+    LOG.info("Verifying column " + colName + " has value " + expectedVal);
+
+    PreparedStatement statement = conn.prepareStatement(
+      "SELECT " + colName + " FROM " + table + " WHERE id = " + id,
+      ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    Object actualVal = null;
+    try {
+      ResultSet rs = statement.executeQuery();
+      try {
+        rs.next();
+        actualVal = rs.getObject(1);
+      } finally {
+        rs.close();
+      }
+    } finally {
+      statement.close();
+    }
+
+    assertEquals(expectedVal, actualVal);
+  }
+
+  /**
+   * Verify that on a given row, a column has a given value.
+   *
+   * @param id
+   *          the id column specifying the row to test.
+   */
+  public static void assertHCatColValForRowId(List<HCatRecord> recs,
+    HCatSchema schema, int id, String fieldName,
+    Object expectedVal) throws IOException {
+    LOG.info("Verifying field " + fieldName + " has value " + expectedVal);
+
+    Object actualVal = null;
+    for (HCatRecord rec : recs) {
+      if (rec.getInteger("id", schema).equals(id)) {
+        actualVal = rec.get(fieldName, schema);
+        break;
+      }
+    }
+    if (actualVal == null) {
+      throw new IOException("No record found with id = " + id);
+    }
+    if (expectedVal != null && expectedVal instanceof byte[]) {
+      Assert
+        .assertArrayEquals((byte[]) expectedVal, (byte[]) actualVal);
+    } else {
+      if (expectedVal instanceof Float) {
+        if (actualVal instanceof Double) {
+          Assert.assertEquals(((Float) expectedVal).floatValue(),
+            ((Double) actualVal).doubleValue(), DELTAVAL);
+        } else {
+          Assert
+            .assertEquals("Got unexpected column value", expectedVal,
+              actualVal);
+        }
+      } else if (expectedVal instanceof Double) {
+        if (actualVal instanceof Float) {
+          Assert.assertEquals(((Double) expectedVal).doubleValue(),
+            ((Float) actualVal).doubleValue(), DELTAVAL);
+        } else {
+          Assert
+            .assertEquals("Got unexpected column value", expectedVal,
+              actualVal);
+        }
+      } else {
+        Assert
+          .assertEquals("Got unexpected column value", expectedVal,
+            actualVal);
+      }
+    }
+  }
+
+  /**
+   * Return a SQL statement that drops a table, if it exists.
+   *
+   * @param tableName
+   *          the table to drop.
+   * @return the SQL statement to drop that table.
+   */
+  public static String getSqlDropTableStatement(String tableName) {
+    return "DROP TABLE " + tableName + " IF EXISTS";
+  }
+
+  public static String getSqlCreateTableStatement(String tableName,
+    ColumnGenerator... extraCols) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("CREATE TABLE ");
+    sb.append(tableName);
+    sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
+    int colNum = 0;
+    for (ColumnGenerator gen : extraCols) {
+      sb.append(", " + forIdx(colNum++) + " " + gen.getDBTypeString());
+    }
+    sb.append(")");
+    String cmd = sb.toString();
+    LOG.debug("Generated SQL create table command : " + cmd);
+    return cmd;
+  }
+
+  public static String getSqlInsertTableStatement(String tableName,
+    ColumnGenerator... extraCols) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("INSERT INTO ");
+    sb.append(tableName);
+    sb.append(" (id, msg");
+    int colNum = 0;
+    for (ColumnGenerator gen : extraCols) {
+      sb.append(", " + forIdx(colNum++));
+    }
+    sb.append(") VALUES ( ?, ?");
+    for (int i = 0; i < extraCols.length; ++i) {
+      sb.append(",?");
+    }
+    sb.append(")");
+    String s = sb.toString();
+    LOG.debug("Generated SQL insert table command : " + s);
+    return s;
+  }
+
+  public void createSqlTable(Connection conn, boolean generateOnly,
+    int count, String table, ColumnGenerator... extraCols)
+    throws Exception {
+    PreparedStatement statement = conn.prepareStatement(
+      getSqlDropTableStatement(table),
+      ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try {
+      statement.executeUpdate();
+      conn.commit();
+    } finally {
+      statement.close();
+    }
+    statement = conn.prepareStatement(
+      getSqlCreateTableStatement(table, extraCols),
+      ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try {
+      statement.executeUpdate();
+      conn.commit();
+    } finally {
+      statement.close();
+    }
+    if (!generateOnly) {
+      loadSqlTable(conn, table, count, extraCols);
+    }
+  }
+
+  public HCatSchema createHCatTable(CreateMode mode, int count,
+    String table, ColumnGenerator... extraCols)
+    throws Exception {
+    HCatSchema hCatTblSchema = generateHCatTableSchema(extraCols);
+    HCatSchema hCatPartSchema = generateHCatPartitionSchema(extraCols);
+    HCatSchema hCatFullSchema = new HCatSchema(hCatTblSchema.getFields());
+    for (HCatFieldSchema hfs : hCatPartSchema.getFields()) {
+      hCatFullSchema.append(hfs);
+    }
+    if (mode != CreateMode.NO_CREATION) {
+
+      createHCatTableUsingSchema(null, table,
+        hCatTblSchema.getFields(), hCatPartSchema.getFields());
+      if (mode == CreateMode.CREATE_AND_LOAD) {
+        HCatSchema hCatLoadSchema = new HCatSchema(hCatTblSchema.getFields());
+        HCatSchema dynPartSchema =
+          generateHCatDynamicPartitionSchema(extraCols);
+        for (HCatFieldSchema hfs : dynPartSchema.getFields()) {
+          hCatLoadSchema.append(hfs);
+        }
+        loadHCatTable(hCatLoadSchema, table, count, extraCols);
+      }
+    }
+    return hCatFullSchema;
+  }
+
+  private void loadHCatTable(HCatSchema hCatSchema, String table,
+    int count, ColumnGenerator... extraCols)
+    throws Exception {
+    Map<String, String> staticKeyMap = new HashMap<String, String>();
+    for (ColumnGenerator col : extraCols) {
+      if (col.getKeyType() == KeyType.STATIC_KEY) {
+        staticKeyMap.put(col.getName(), (String) col.getHCatValue(0));
+      }
+    }
+    loadHCatTable(null, table, staticKeyMap,
+      hCatSchema, generateHCatRecords(count, hCatSchema, extraCols));
+  }
+
+  private void loadSqlTable(Connection conn, String table, int count,
+    ColumnGenerator... extraCols) throws Exception {
+    PreparedStatement statement = conn.prepareStatement(
+      getSqlInsertTableStatement(table, extraCols),
+      ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try {
+      for (int i = 0; i < count; ++i) {
+        statement.setObject(1, i, Types.INTEGER);
+        statement.setObject(2, "textfield" + i, Types.VARCHAR);
+        for (int j = 0; j < extraCols.length; ++j) {
+          statement.setObject(j + 3, extraCols[j].getDBValue(i),
+            extraCols[j].getSqlType());
+        }
+        statement.executeUpdate();
+      }
+      if (!conn.getAutoCommit()) {
+        conn.commit();
+      }
+    } finally {
+      statement.close();
+    }
+  }
+
+  private HCatSchema generateHCatTableSchema(ColumnGenerator... extraCols)
+    throws Exception {
+    List<HCatFieldSchema> hCatTblCols = new ArrayList<HCatFieldSchema>();
+    hCatTblCols.clear();
+    hCatTblCols.add(new HCatFieldSchema("id", HCatFieldSchema.Type.INT, ""));
+    hCatTblCols
+      .add(new HCatFieldSchema("msg", HCatFieldSchema.Type.STRING, ""));
+    for (ColumnGenerator gen : extraCols) {
+      if (gen.getKeyType() == KeyType.NOT_A_KEY) {
+        hCatTblCols
+          .add(new HCatFieldSchema(gen.getName(), gen.getHCatType(), ""));
+      }
+    }
+    HCatSchema hCatTblSchema = new HCatSchema(hCatTblCols);
+    return hCatTblSchema;
+  }
+
+  private HCatSchema generateHCatPartitionSchema(ColumnGenerator... extraCols)
+    throws Exception {
+    List<HCatFieldSchema> hCatPartCols = new ArrayList<HCatFieldSchema>();
+
+    for (ColumnGenerator gen : extraCols) {
+      if (gen.getKeyType() != KeyType.NOT_A_KEY) {
+        hCatPartCols
+          .add(new HCatFieldSchema(gen.getName(), gen.getHCatType(), ""));
+      }
+    }
+    HCatSchema hCatPartSchema = new HCatSchema(hCatPartCols);
+    return hCatPartSchema;
+  }
+
+  private HCatSchema generateHCatDynamicPartitionSchema(
+    ColumnGenerator... extraCols) throws Exception {
+    List<HCatFieldSchema> hCatPartCols = new ArrayList<HCatFieldSchema>();
+    hCatPartCols.clear();
+    boolean staticFound = false;
+    for (ColumnGenerator gen : extraCols) {
+      if (gen.getKeyType() != KeyType.NOT_A_KEY) {
+        if (gen.getKeyType() == KeyType.STATIC_KEY && !staticFound) {
+          staticFound = true;
+          continue;
+        }
+        hCatPartCols
+          .add(new HCatFieldSchema(gen.getName(), gen.getHCatType(), ""));
+      }
+    }
+    HCatSchema hCatPartSchema = new HCatSchema(hCatPartCols);
+    return hCatPartSchema;
+
+  }
+
+  private HCatSchema generateHCatStaticPartitionSchema(
+    ColumnGenerator... extraCols) throws Exception {
+    List<HCatFieldSchema> hCatPartCols = new ArrayList<HCatFieldSchema>();
+    hCatPartCols.clear();
+    for (ColumnGenerator gen : extraCols) {
+      if (gen.getKeyType() == KeyType.STATIC_KEY) {
+        hCatPartCols
+          .add(new HCatFieldSchema(gen.getName(), gen.getHCatType(), ""));
+        break;
+      }
+    }
+    HCatSchema hCatPartSchema = new HCatSchema(hCatPartCols);
+    return hCatPartSchema;
+  }
+
+  private List<HCatRecord> generateHCatRecords(int numRecords,
+    HCatSchema hCatTblSchema, ColumnGenerator... extraCols) throws Exception {
+    List<HCatRecord> records = new ArrayList<HCatRecord>();
+    List<HCatFieldSchema> hCatTblCols = hCatTblSchema.getFields();
+    int size = hCatTblCols.size();
+    for (int i = 0; i < numRecords; ++i) {
+      DefaultHCatRecord record = new DefaultHCatRecord(size);
+      record.set(hCatTblCols.get(0).getName(), hCatTblSchema, i);
+      record.set(hCatTblCols.get(1).getName(), hCatTblSchema, "textfield" + i);
+      boolean staticFound = false;
+      int idx = 0;
+      for (int j = 0; j < extraCols.length; ++j) {
+        if (extraCols[j].getKeyType() == KeyType.STATIC_KEY
+          && !staticFound) {
+          staticFound = true;
+          continue;
+        }
+        record.set(hCatTblCols.get(idx + 2).getName(), hCatTblSchema,
+          extraCols[j].getHCatValue(i));
+        ++idx;
+      }
+
+      records.add(record);
+    }
+    return records;
+  }
+
+  public String hCatRecordDump(List<HCatRecord> recs,
+    HCatSchema schema) throws Exception {
+    List<String> fields = schema.getFieldNames();
+    int count = 0;
+    StringBuilder sb = new StringBuilder(1024);
+    for (HCatRecord rec : recs) {
+      sb.append("HCat Record : " + ++count).append('\n');
+      for (String field : fields) {
+        sb.append('\t').append(field).append('=');
+        sb.append(rec.get(field, schema)).append('\n');
+        sb.append("\n\n");
+      }
+    }
+    return sb.toString();
+  }
+
+  public Map<String, String> getAddlTestArgs() {
+    String addlArgs = System.getProperty(SQOOP_HCATALOG_TEST_ARGS);
+    Map<String, String> addlArgsMap = new HashMap<String, String>();
+    if (addlArgs != null) {
+      String[] argsArray = addlArgs.split(",");
+      for (String s : argsArray) {
+        String[] keyVal = s.split("=");
+        if (keyVal.length == 2) {
+          addlArgsMap.put(keyVal[0], keyVal[1]);
+        } else {
+          LOG.info("Ignoring malformed addl arg " + s);
+        }
+      }
+    }
+    return addlArgsMap;
+  }
+}
diff --git a/src/test/org/apache/sqoop/hcat/TestHCatalogBasic.java b/src/test/org/apache/sqoop/hcat/TestHCatalogBasic.java
new file mode 100644 (file)
index 0000000..da803d0
--- /dev/null
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hcat;
+
+import junit.framework.TestCase;
+
+import org.junit.Before;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.tool.ExportTool;
+import com.cloudera.sqoop.tool.ImportTool;
+
+/**
+ * Test basic HCatalog related features.
+ */
+public class TestHCatalogBasic extends TestCase {
+  private static ImportTool importTool;
+  private static ExportTool exportTool;
+
+  @Before
+  @Override
+  public void setUp() {
+    importTool = new ImportTool();
+    exportTool = new ExportTool();
+  }
+  private SqoopOptions parseImportArgs(String[] argv) throws Exception {
+    SqoopOptions opts = importTool.parseArguments(argv, null, null, false);
+    return opts;
+  }
+
+  private SqoopOptions parseExportArgs(String[] argv) throws Exception {
+    SqoopOptions opts = exportTool.parseArguments(argv, null, null, false);
+    return opts;
+  }
+
+  public void testHCatalogHomeWithImport() throws Exception {
+    String[] args = {
+      "--hcatalog-home",
+      "/usr/lib/hcatalog",
+    };
+
+    SqoopOptions opts = parseImportArgs(args);
+  }
+
+  public void testHCatalogHomeWithExport() throws Exception {
+    String[] args = {
+      "--hcatalog-home",
+      "/usr/lib/hcatalog",
+    };
+
+    SqoopOptions opts = parseExportArgs(args);
+  }
+
+  public void testHCatalogImport() throws Exception {
+    String[] args = {
+      "--hcatalog-table",
+      "table",
+    };
+
+    SqoopOptions opts = parseImportArgs(args);
+  }
+
+  public void testHCatalogExport() throws Exception {
+    String[] args = {
+      "--hcatalog-table",
+      "table",
+    };
+
+    SqoopOptions opts = parseExportArgs(args);
+  }
+
+  public void testHCatImportWithTargetDir() throws Exception {
+    String[] args = {
+      "--connect",
+      "jdbc:db:url",
+      "--table",
+      "dbtable",
+      "--hcatalog-table",
+      "table",
+      "--target-dir",
+      "/target/dir",
+    };
+    try {
+      SqoopOptions opts = parseImportArgs(args);
+      importTool.validateOptions(opts);
+      fail("Expected InvalidOptionsException");
+    } catch (SqoopOptions.InvalidOptionsException ioe) {
+      // expected.
+    }
+  }
+
+  public void testHCatImportWithWarehouseDir() throws Exception {
+    String[] args = {
+      "--connect",
+      "jdbc:db:url",
+      "--table",
+      "dbtable",
+      "--hcatalog-table",
+      "table",
+      "--warehouse-dir",
+      "/target/dir",
+    };
+    try {
+      SqoopOptions opts = parseImportArgs(args);
+      importTool.validateOptions(opts);
+      fail("Expected InvalidOptionsException");
+    } catch (SqoopOptions.InvalidOptionsException ioe) {
+      // expected.
+    }
+  }
+
+  public void testHCatImportWithHiveImport() throws Exception {
+    String[] args = {
+      "--connect",
+      "jdbc:db:url",
+      "--table",
+      "dbtable",
+      "--hcatalog-table",
+      "table",
+      "--hive-import",
+    };
+    try {
+      SqoopOptions opts = parseImportArgs(args);
+      importTool.validateOptions(opts);
+      fail("Expected InvalidOptionsException");
+    } catch (SqoopOptions.InvalidOptionsException ioe) {
+      // expected.
+    }
+  }
+
+  public void testHCatExportWithExportDir() throws Exception {
+    String[] args = {
+      "--connect",
+      "jdbc:db:url",
+      "--table",
+      "dbtable",
+      "--hcatalog-table",
+      "table",
+      "--export-dir",
+      "/export/dir",
+    };
+    try {
+      SqoopOptions opts = parseExportArgs(args);
+      exportTool.validateOptions(opts);
+      fail("Expected InvalidOptionsException");
+    } catch (SqoopOptions.InvalidOptionsException ioe) {
+      // expected.
+    }
+  }
+
+  public void testHCatImportWithDirect() throws Exception {
+    String[] args = {
+      "--connect",
+      "jdbc:db:url",
+      "--table",
+      "dbtable",
+      "--hcatalog-table",
+      "table",
+      "--direct",
+    };
+    try {
+      SqoopOptions opts = parseImportArgs(args);
+      importTool.validateOptions(opts);
+      fail("Expected InvalidOptionsException");
+    } catch (SqoopOptions.InvalidOptionsException ioe) {
+      // expected.
+    }
+  }
+
+  public void testHCatImportWithSequenceFile() throws Exception {
+    String[] args = {
+      "--connect",
+      "jdbc:db:url",
+      "--table",
+      "dbtable",
+      "--hcatalog-table",
+      "table",
+      "--as-sequencefile"
+    };
+    try {
+      SqoopOptions opts = parseImportArgs(args);
+      importTool.validateOptions(opts);
+      fail("Expected InvalidOptionsException");
+    } catch (SqoopOptions.InvalidOptionsException ioe) {
+      // expected.
+    }
+  }
+
+  public void testHCatImportWithAvroFile() throws Exception {
+    String[] args = {
+      "--connect",
+      "jdbc:db:url",
+      "--table",
+      "dbtable",
+      "--hcatalog-table",
+      "table",
+      "--as-avrofile"
+    };
+    try {
+      SqoopOptions opts = parseImportArgs(args);
+      importTool.validateOptions(opts);
+      fail("Expected InvalidOptionsException");
+    } catch (SqoopOptions.InvalidOptionsException ioe) {
+      // expected.
+    }
+  }
+  public void testHCatImportWithCreateTable() throws Exception {
+    String[] args = {
+      "--hcatalog-table",
+      "table",
+      "--create-hcatalog-table",
+    };
+    SqoopOptions opts = parseImportArgs(args);
+  }
+
+  public void testHCatImportWithStorageStanza() throws Exception {
+    String[] args = {
+      "--hcatalog-table",
+      "table",
+      "--hcatalog-storage-stanza",
+      "stored as textfile",
+    };
+    SqoopOptions opts = parseImportArgs(args);
+  }
+
+  public void testHCatImportWithDatabase() throws Exception {
+    String[] args = {
+      "--hcatalog-table",
+      "table",
+      "--hcatalog-database",
+      "default",
+    };
+    SqoopOptions opts = parseImportArgs(args);
+  }
+}
diff --git a/testdata/hcatalog/conf/hive-log4j.properties b/testdata/hcatalog/conf/hive-log4j.properties
new file mode 100644 (file)
index 0000000..7fa0546
--- /dev/null
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Define some default values that can be overridden by system properties
+
+sqoop.root.logger=DEBUG,console,DRFA
+hive.root.logger=DEBUG,console,DRFA
+hcatalog.root.logger=DEBUG,console,DRFA
+sqoop.log.dir=${user.dir}/sqoop/logs
+hive.log.dir=${user.dir}/sqoop/logs/
+sqoop.log.file=sqoop.log
+hive.log.file=hive.log
+org.apache.sqoop=DEBUG, console
+org.apache.hadoop=DEBUG, console
+org.apache.hive=DEBUG, console
+org.apache.hcatalog=DEBUG, console
+
+# Define the root logger to the system property "sqoop.root.logger".
+log4j.rootLogger=${sqoop.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=WARN
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hive.log.dir}/${hive.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#custom logging levels
+#log4j.logger.xxx=DEBUG
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.hive.shims.HiveEventCounter
+
+
+log4j.category.DataNucleus=INFO,DRFA
+log4j.category.Datastore=INFO,DRFA
+log4j.category.Datastore.Schema=INFO,DRFA
+log4j.category.JPOX.Datastore=INFO,DRFA
+log4j.category.JPOX.Plugin=INFO,DRFA
+log4j.category.JPOX.MetaData=INFO,DRFA
+log4j.category.JPOX.Query=INFO,DRFA
+log4j.category.JPOX.General=INFO,DRFA
+log4j.category.JPOX.Enhancer=INFO,DRFA
+log4j.logger.org.apache.hadoop.conf.Configuration=INFO,DRFA
+
diff --git a/testdata/hcatalog/conf/hive-site.xml b/testdata/hcatalog/conf/hive-site.xml
new file mode 100644 (file)
index 0000000..c84af28
--- /dev/null
@@ -0,0 +1,26 @@
+<configuration>
+  <property>
+    <name>hive.metastore.local</name>
+    <value>true</value>
+  </property>
+  <property>
+    <name>hive.metastore.warehouse.dir</name>
+    <value>${test.build.data}/sqoop/warehouse</value>
+  </property>
+  <property>
+    <name>hive.metastore.uris</name>
+    <value></value>
+  </property>
+  <property>
+    <name>javax.jdo.option.ConnectionURL</name>
+    <value>jdbc:derby:;databaseName=${test.build.data}/sqoop/metastore_db;create=true</value>
+  </property>
+  <property>
+    <name>javax.jdo.option.ConnectionDriverName</name>
+    <value>org.apache.derby.jdbc.EmbeddedDriver</value>
+  </property>
+  <property>
+    <name>hive.querylog.location</name>
+    <value>${test.build.data}/sqoop/logs</value>
+  </property>
+</configuration>
diff --git a/testdata/hcatalog/conf/log4j.properties b/testdata/hcatalog/conf/log4j.properties
new file mode 100644 (file)
index 0000000..370fbfa
--- /dev/null
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.sqoop=DEBUG, console
+org.apache.hadoop=DEBUG, console
+org.apache.hive=DEBUG, console
+org.apache.hcatalog=DEBUG, console
+
+
+sqoop.root.logger=DEBUG,console,DRFA
+hive.root.logger=DEBUG,console,DRFA
+hcatalog.root.logger=DEBUG,console,DRFA
+sqoop.log.dir=${user.dir}/sqoop/logs
+sqoop.log.file=sqoop.log
+
+
+
+# Define the root logger to the system property "sqoop.root.logger".
+log4j.rootLogger=${sqoop.root.logger}
+
+#
+# DRFA
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${sqoop.log.dir}/${sqoop.log.file}
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.DRFA.layout.ConversionPattern=%d (%t) [%p - %l] %m%n
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n