SQOOP-318. Support splittable LZO files with Hive.
authorArvind Prabhakar <arvind@apache.org>
Tue, 23 Aug 2011 17:17:08 +0000 (17:17 +0000)
committerArvind Prabhakar <arvind@apache.org>
Tue, 23 Aug 2011 17:17:08 +0000 (17:17 +0000)
(Joey Echeverria via Arvind Prabhakar)

git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1160815 13f79535-47bb-0310-9956-ffa450edef68

src/docs/user/hive.txt
src/java/com/cloudera/sqoop/hive/HiveImport.java
src/java/com/cloudera/sqoop/hive/TableDefWriter.java
src/java/com/cloudera/sqoop/io/CodecMap.java
src/test/com/cloudera/sqoop/hive/TestTableDefWriter.java

index 059d7cb..2699d3e 100644 (file)
@@ -76,3 +76,12 @@ particular partition by specifying the +\--hive-partition-key+ and
 +\--hive-partition-value+ arguments.  The partition value must be a
 string.  Please see the Hive documentation for more details on
 partitioning.
+
+You can import compressed tables into Hive using the +\--compress+ and
++\--compression-codec+ options. One downside to compressing tables imported
+into Hive is that many codecs cannot be split for processing by parallel map
+tasks. The lzop codec, however, does support splitting. When importing tables
+with this codec, Sqoop will automatically index the files for splitting and
+configuring a new Hive table with the correct InputFormat. This feature
+currently requires that all partitions of a table be compressed with the lzop
+codec.
index 36c17ba..d363e9f 100644 (file)
@@ -35,11 +35,15 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
 import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.io.CodecMap;
 import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.util.Executor;
 import com.cloudera.sqoop.util.ExitSecurityException;
 import com.cloudera.sqoop.util.LoggingAsyncSink;
 import com.cloudera.sqoop.util.SubprocessSecurityManager;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.Tool;
 
 /**
  * Utility to import a table into the Hive metastore. Manages the connection
@@ -187,6 +191,23 @@ public class HiveImport {
     String createTableStr = tableWriter.getCreateTableStmt() + ";\n";
     String loadDataStmtStr = tableWriter.getLoadDataStmt() + ";\n";
 
+    if (!isGenerateOnly()) {
+      String codec = options.getCompressionCodec();
+      if (codec != null && (codec.equals(CodecMap.LZOP)
+              || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
+        try {
+          String finalPathStr = tableWriter.getFinalPathStr();
+          Tool tool = ReflectionUtils.newInstance(Class.
+                  forName("com.hadoop.compression.lzo.DistributedLzoIndexer").
+                  asSubclass(Tool.class), configuration);
+          ToolRunner.run(configuration, tool, new String[] { finalPathStr });
+        } catch (Exception ex) {
+          LOG.error("Error indexing lzo files", ex);
+          throw new IOException("Error indexing lzo files", ex);
+        }
+      }
+    }
+
     // write them to a script file.
     File scriptFile = getScriptFile(outputTableName);
     try {
index 7dd9135..6de9dd6 100644 (file)
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.io.CodecMap;
 import com.cloudera.sqoop.manager.ConnManager;
 
 import java.io.File;
@@ -177,7 +178,16 @@ public class TableDefWriter {
     sb.append(getHiveOctalCharCode((int) options.getOutputFieldDelim()));
     sb.append("' LINES TERMINATED BY '");
     sb.append(getHiveOctalCharCode((int) options.getOutputRecordDelim()));
-    sb.append("' STORED AS TEXTFILE");
+    String codec = options.getCompressionCodec();
+    if (codec != null && (codec.equals(CodecMap.LZOP)
+            || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
+      sb.append("' STORED AS INPUTFORMAT "
+              + "'com.hadoop.mapred.DeprecatedLzoTextInputFormat'");
+      sb.append(" OUTPUTFORMAT "
+              + "'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'");
+    } else {
+      sb.append("' STORED AS TEXTFILE");
+    }
 
     LOG.debug("Create statement: " + sb.toString());
     return sb.toString();
@@ -190,22 +200,7 @@ public class TableDefWriter {
    * @return the LOAD DATA statement to import the data in HDFS into hive.
    */
   public String getLoadDataStmt() throws IOException {
-    String warehouseDir = options.getWarehouseDir();
-    if (null == warehouseDir) {
-      warehouseDir = "";
-    } else if (!warehouseDir.endsWith(File.separator)) {
-      warehouseDir = warehouseDir + File.separator;
-    }
-
-    String tablePath;
-    if (null != inputTableName) {
-      tablePath = warehouseDir + inputTableName;
-    } else {
-      tablePath = options.getTargetDir();
-    }
-    FileSystem fs = FileSystem.get(configuration);
-    Path finalPath = new Path(tablePath).makeQualified(fs);
-    String finalPathStr = finalPath.toString();
+    String finalPathStr = getFinalPathStr();
 
     StringBuilder sb = new StringBuilder();
     sb.append("LOAD DATA INPATH '");
@@ -228,6 +223,25 @@ public class TableDefWriter {
     return sb.toString();
   }
 
+  public String getFinalPathStr() throws IOException {
+    String warehouseDir = options.getWarehouseDir();
+    if (null == warehouseDir) {
+      warehouseDir = "";
+    } else if (!warehouseDir.endsWith(File.separator)) {
+      warehouseDir = warehouseDir + File.separator;
+    }
+
+    String tablePath;
+    if (null != inputTableName) {
+      tablePath = warehouseDir + inputTableName;
+    } else {
+      tablePath = options.getTargetDir();
+    }
+    FileSystem fs = FileSystem.get(configuration);
+    Path finalPath = new Path(tablePath).makeQualified(fs);
+    return finalPath.toString();
+  }
+
   /**
    * Return a string identifying the character to use as a delimiter
    * in Hive, in octal representation.
index 8564164..e835e57 100644 (file)
@@ -40,6 +40,7 @@ public final class CodecMap {
   public static final String NONE = "none";
   public static final String DEFLATE = "deflate";
   public static final String LZO = "lzo";
+  public static final String LZOP = "lzop";
 
   private static Map<String, String> codecNames;
   static {
@@ -49,6 +50,7 @@ public final class CodecMap {
     codecNames.put(NONE,    null);
     codecNames.put(DEFLATE, "org.apache.hadoop.io.compress.DefaultCodec");
     codecNames.put(LZO,     "com.hadoop.compression.lzo.LzoCodec");
+    codecNames.put(LZOP,     "com.hadoop.compression.lzo.LzopCodec");
 
     // add more from Hadoop CompressionCodecFactory
     for (Class<? extends CompressionCodec> cls
index 43b755e..978e356 100644 (file)
@@ -106,4 +106,32 @@ public class TestTableDefWriter extends TestCase {
         + "LINES TERMINATED BY '\\012' STORED AS TEXTFILE", createTable);
     assertTrue(loadData.endsWith(" PARTITION (ds='20110413')"));
   }
+
+  public void testLzoSplitting() throws Exception {
+    String[] args = {
+        "--compress",
+        "--compression-codec", "lzop",
+    };
+    Configuration conf = new Configuration();
+    SqoopOptions options =
+      new ImportTool().parseArguments(args, null, null, false);
+    TableDefWriter writer = new TableDefWriter(options,
+        null, "inputTable", "outputTable", conf, false);
+
+    Map<String, Integer> colTypes = new HashMap<String, Integer>();
+    writer.setColumnTypes(colTypes);
+
+    String createTable = writer.getCreateTableStmt();
+    String loadData = writer.getLoadDataStmt();
+
+    assertNotNull(createTable);
+    assertNotNull(loadData);
+    assertEquals("CREATE TABLE IF NOT EXISTS `outputTable` ( ) "
+        + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\054' "
+        + "LINES TERMINATED BY '\\012' STORED AS "
+        + "INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat' "
+        + "OUTPUTFORMAT "
+        + "'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'",
+        createTable);
+  }
 }