SQOOP-3318: Remove Kite dependency from test cases
authorBoglarka Egyed <bogi@apache.org>
Fri, 27 Apr 2018 08:38:10 +0000 (10:38 +0200)
committerBoglarka Egyed <bogi@apache.org>
Fri, 27 Apr 2018 08:38:10 +0000 (10:38 +0200)
(Szabolcs Vasas via Boglarka Egyed)

src/java/org/apache/sqoop/util/FileSystemUtil.java
src/test/org/apache/sqoop/TestAllTables.java
src/test/org/apache/sqoop/TestMerge.java
src/test/org/apache/sqoop/TestParquetExport.java
src/test/org/apache/sqoop/TestParquetImport.java
src/test/org/apache/sqoop/hive/TestHiveImport.java
src/test/org/apache/sqoop/util/ParquetReader.java [new file with mode: 0644]

index 1493e09..96ec212 100644 (file)
 package org.apache.sqoop.util;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 
 public final class FileSystemUtil {
   private FileSystemUtil() {
@@ -42,4 +48,18 @@ public final class FileSystemUtil {
 
     return path.getFileSystem(conf).makeQualified(path);
   }
+
+  public static boolean isFile(Path path, Configuration conf) throws IOException {
+    return path.getFileSystem(conf).isFile(path);
+  }
+
+  public static List<Path> listFiles(Path path, Configuration conf) throws IOException {
+    List<Path> result = new ArrayList<>();
+    FileSystem fileSystem = path.getFileSystem(conf);
+    RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(path, false);
+    while (files.hasNext()) {
+      result.add(files.next().getPath());
+    }
+    return result;
+  }
 }
index 56d1f57..16933a8 100644 (file)
@@ -23,12 +23,12 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.sqoop.util.ParquetReader;
 import org.junit.Before;
 import org.junit.After;
 
@@ -36,10 +36,8 @@ import org.apache.sqoop.testutil.CommonArgs;
 import org.apache.sqoop.testutil.ImportJobTestCase;
 import org.apache.sqoop.tool.ImportAllTablesTool;
 import org.junit.Test;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetReader;
-import org.kitesdk.data.Datasets;
 
+import static java.util.Collections.singletonList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
@@ -180,7 +178,6 @@ public class TestAllTables extends ImportJobTestCase {
     int i = 0;
     for (String tableName : this.tableNames) {
       Path tablePath = new Path(warehousePath, tableName);
-      Dataset dataset = Datasets.load("dataset:file:" + tablePath);
 
       // dequeue the expected value for this table. This
       // list has the same order as the tableNames list.
@@ -188,16 +185,9 @@ public class TestAllTables extends ImportJobTestCase {
           + this.expectedStrings.get(0);
       this.expectedStrings.remove(0);
 
-      DatasetReader<GenericRecord> reader = dataset.newReader();
-      try {
-        GenericRecord record = reader.next();
-        String line = record.get(0) + "," + record.get(1);
-        assertEquals("Table " + tableName + " expected a different string",
-            expectedVal, line);
-        assertFalse(reader.hasNext());
-      } finally {
-        reader.close();
-      }
+      List<String> result = new ParquetReader(tablePath).readAllInCsv();
+      assertEquals("Table " + tableName + " expected a different string",
+          singletonList(expectedVal), result);
     }
   }
 
index 8eef8d4..11806fe 100644 (file)
@@ -48,13 +48,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.util.ParquetReader;
 import org.junit.Before;
 import org.junit.Test;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetReader;
-import org.kitesdk.data.Datasets;
 
-import static org.apache.avro.generic.GenericData.Record;
 import static org.junit.Assert.fail;
 
 /**
@@ -298,21 +295,11 @@ public class TestMerge extends BaseSqoopTestCase {
     return false;
   }
 
-  private boolean checkParquetFileForLine(FileSystem fileSystem, Path path, List<Integer> record) throws IOException
-  {
-    Dataset<Record> parquetRecords = Datasets.load("dataset:" + path.getParent(), Record.class);
-    DatasetReader<Record> datasetReader = null;
-    try {
-      datasetReader = parquetRecords.newReader();
-      for (GenericRecord genericRecord : datasetReader) {
-        if (valueMatches(genericRecord, record)) {
-          return true;
-        }
-      }
-    }
-    finally {
-      if (datasetReader != null) {
-        datasetReader.close();
+  private boolean checkParquetFileForLine(Path path, List<Integer> record) throws IOException {
+    List<GenericRecord> resultRecords = new ParquetReader(path.getParent()).readAll();
+    for (GenericRecord resultRecord : resultRecords) {
+      if (valueMatches(resultRecord, record)) {
+        return true;
       }
     }
 
@@ -330,7 +317,7 @@ public class TestMerge extends BaseSqoopTestCase {
         result = checkAvroFileForLine(fs, p, record);
         break;
       case ParquetFile:
-        result = checkParquetFileForLine(fs, p, record);
+        result = checkParquetFileForLine(p, record);
         break;
     }
     return result;
index c8bb663..43dabb5 100644 (file)
 
 package org.apache.sqoop;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.sqoop.testutil.ExportJobTestCase;
 import com.google.common.collect.Lists;
 import org.apache.avro.Schema;
@@ -28,7 +32,7 @@ import org.junit.Rule;
 
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
-import org.kitesdk.data.*;
+import parquet.avro.AvroParquetWriter;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -39,9 +43,13 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
+import static parquet.hadoop.metadata.CompressionCodecName.SNAPPY;
 
 
 /**
@@ -121,30 +129,45 @@ public class TestParquetExport extends ExportJobTestCase {
 
   /**
    * Create a data file that gets exported to the db.
-   * @param fileNum the number of the file (for multi-file export)
+   * Sqoop uses Kite to export Parquet files so it requires a Kite metadata directory to be present next to the files
+   * but since we do not use Kite in our test cases anymore we generate the .metadata directory here.
    * @param numRecords how many records to write to the file.
    */
-  protected void createParquetFile(int fileNum, int numRecords,
+  protected void createParquetFile(int numRecords,
       ColumnGenerator... extraCols) throws IOException {
 
-    String uri = "dataset:file:" + getTablePath();
     Schema schema = buildSchema(extraCols);
-    DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
-      .schema(schema)
-      .format(Formats.PARQUET)
-      .build();
-    Dataset dataset = Datasets.create(uri, descriptor);
-    DatasetWriter writer = dataset.newWriter();
-    try {
+
+    createMetadataDir(schema);
+    String fileName = UUID.randomUUID().toString() + ".parquet";
+    Path filePath = new Path(getTablePath(), fileName);
+    try (AvroParquetWriter parquetWriter = new AvroParquetWriter(filePath, schema, SNAPPY, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE)) {
       for (int i = 0; i < numRecords; i++) {
         GenericRecord record = new GenericData.Record(schema);
         record.put("id", i);
         record.put("msg", getMsgPrefix() + i);
         addExtraColumns(record, i, extraCols);
-        writer.write(record);
+        parquetWriter.write(record);
       }
-    } finally {
-      writer.close();
+    }
+  }
+
+  private void createMetadataDir(Schema schema) throws IOException {
+    final String descriptorFileTemplate = "location=file\\:%s\n" +
+        "    version=1\n" +
+        "    compressionType=snappy\n" +
+        "    format=parquet\n";
+    Path metadataDirPath = new Path(getTablePath(), ".metadata");
+    Path schemaFile = new Path(metadataDirPath, "schema.avsc");
+    Path descriptorFile = new Path(metadataDirPath, "descriptor.properties");
+    FileSystem fileSystem = getTablePath().getFileSystem(new Configuration());
+    fileSystem.mkdirs(metadataDirPath);
+
+    try (FSDataOutputStream fileOs = fileSystem.create(schemaFile)) {
+      fileOs.write(schema.toString().getBytes());
+    }
+    try (FSDataOutputStream fileOs = fileSystem.create(descriptorFile)) {
+      fileOs.write(String.format(descriptorFileTemplate, getTablePath()).getBytes());
     }
   }
 
@@ -352,7 +375,7 @@ public class TestParquetExport extends ExportJobTestCase {
       colGenerator(new GenericData.EnumSymbol(enumeration, "a"), enumeration,
           "a", "VARCHAR(8)"),
     };
-    createParquetFile(0, TOTAL_RECORDS, gens);
+    createParquetFile(TOTAL_RECORDS, gens);
     createTable(gens);
     runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
     verifyExport(TOTAL_RECORDS);
@@ -372,7 +395,7 @@ public class TestParquetExport extends ExportJobTestCase {
     Schema schema =  Schema.createUnion(childSchemas);
     ColumnGenerator gen0 = colGenerator(null, schema, null, "VARCHAR(64)");
     ColumnGenerator gen1 = colGenerator("s", schema, "s", "VARCHAR(64)");
-    createParquetFile(0, TOTAL_RECORDS, gen0, gen1);
+    createParquetFile(TOTAL_RECORDS, gen0, gen1);
     createTable(gen0, gen1);
     runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
     verifyExport(TOTAL_RECORDS);
@@ -392,7 +415,7 @@ public class TestParquetExport extends ExportJobTestCase {
     record.put("myint", 100);
     // DB type is not used so can be anything:
     ColumnGenerator gen = colGenerator(record, schema, null, "VARCHAR(64)");
-    createParquetFile(0, TOTAL_RECORDS,  gen);
+    createParquetFile(TOTAL_RECORDS,  gen);
     createTable(gen);
 
     thrown.expect(Exception.class);
@@ -409,7 +432,7 @@ public class TestParquetExport extends ExportJobTestCase {
     // the Parquet value will not be exported
     ColumnGenerator gen = colGenerator(100, Schema.create(Schema.Type.INT),
         null, null);
-    createParquetFile(0, TOTAL_RECORDS, gen);
+    createParquetFile(TOTAL_RECORDS, gen);
     createTable(gen);
     runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
     verifyExport(TOTAL_RECORDS);
@@ -419,7 +442,7 @@ public class TestParquetExport extends ExportJobTestCase {
   public void testParquetWithUpdateKey() throws IOException, SQLException {
     String[] argv = { "--update-key", "ID" };
     final int TOTAL_RECORDS = 1;
-    createParquetFile(0, TOTAL_RECORDS);
+    createParquetFile(TOTAL_RECORDS);
     createTableWithInsert();
     runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
     verifyExport(getMsgPrefix() + "0");
@@ -432,7 +455,7 @@ public class TestParquetExport extends ExportJobTestCase {
     final int TOTAL_RECORDS = 2;
     // ColumnGenerator gen = colGenerator("100",
     // Schema.create(Schema.Type.STRING), null, "VARCHAR(64)");
-    createParquetFile(0, TOTAL_RECORDS);
+    createParquetFile(TOTAL_RECORDS);
     createTableWithInsert();
 
     thrown.expect(Exception.class);
@@ -447,7 +470,7 @@ public class TestParquetExport extends ExportJobTestCase {
 
     // null Parquet schema means don't create an Parquet field
     ColumnGenerator gen = colGenerator(null, null, null, "VARCHAR(64)");
-    createParquetFile(0, TOTAL_RECORDS, gen);
+    createParquetFile(TOTAL_RECORDS, gen);
     createTable(gen);
 
     thrown.expect(Exception.class);
index 379529a..0f9c7f3 100644 (file)
@@ -18,6 +18,8 @@
 
 package org.apache.sqoop;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.sqoop.testutil.CommonArgs;
 import org.apache.sqoop.testutil.HsqldbTestServer;
 import org.apache.sqoop.testutil.ImportJobTestCase;
@@ -28,11 +30,12 @@ import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sqoop.util.ParquetReader;
 import org.junit.Test;
-import org.kitesdk.data.CompressionType;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetReader;
-import org.kitesdk.data.Datasets;
+import parquet.format.CompressionCodec;
+import parquet.hadoop.Footer;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.ParquetMetadata;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -42,7 +45,6 @@ import java.util.Arrays;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -119,10 +121,16 @@ public class TestParquetImport extends ImportJobTestCase {
 
   @Test
   public void testDeflateCompression() throws IOException {
-    runParquetImportTest("deflate");
+    // The current Kite-based Parquet writing implementation uses GZIP compression codec when Deflate is specified.
+    // See: org.kitesdk.data.spi.filesystem.ParquetAppender.getCompressionCodecName()
+    runParquetImportTest("deflate", "gzip");
   }
 
   private void runParquetImportTest(String codec) throws IOException {
+    runParquetImportTest(codec, codec);
+  }
+
+  private void runParquetImportTest(String codec, String expectedCodec) throws IOException {
     String[] types = {"BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", "VARCHAR(6)",
         "VARBINARY(2)",};
     String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", };
@@ -131,7 +139,7 @@ public class TestParquetImport extends ImportJobTestCase {
     String [] extraArgs = { "--compression-codec", codec};
     runImport(getOutputArgv(true, extraArgs));
 
-    assertEquals(CompressionType.forName(codec), getCompressionType());
+    assertEquals(expectedCodec.toUpperCase(), getCompressionType());
 
     Schema schema = getSchema();
     assertEquals(Type.RECORD, schema.getType());
@@ -145,25 +153,21 @@ public class TestParquetImport extends ImportJobTestCase {
     checkField(fields.get(5), "DATA_COL5", Type.STRING);
     checkField(fields.get(6), "DATA_COL6", Type.BYTES);
 
-    DatasetReader<GenericRecord> reader = getReader();
-    try {
-      GenericRecord record1 = reader.next();
-      assertNotNull(record1);
-      assertEquals("DATA_COL0", true, record1.get("DATA_COL0"));
-      assertEquals("DATA_COL1", 100, record1.get("DATA_COL1"));
-      assertEquals("DATA_COL2", 200L, record1.get("DATA_COL2"));
-      assertEquals("DATA_COL3", 1.0f, record1.get("DATA_COL3"));
-      assertEquals("DATA_COL4", 2.0, record1.get("DATA_COL4"));
-      assertEquals("DATA_COL5", "s", record1.get("DATA_COL5"));
-      Object object = record1.get("DATA_COL6");
-      assertTrue(object instanceof ByteBuffer);
-      ByteBuffer b = ((ByteBuffer) object);
-      assertEquals((byte) 1, b.get(0));
-      assertEquals((byte) 2, b.get(1));
-      assertFalse(reader.hasNext());
-    } finally {
-      reader.close();
-    }
+    List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
+    GenericRecord record1 = genericRecords.get(0);
+    assertNotNull(record1);
+    assertEquals("DATA_COL0", true, record1.get("DATA_COL0"));
+    assertEquals("DATA_COL1", 100, record1.get("DATA_COL1"));
+    assertEquals("DATA_COL2", 200L, record1.get("DATA_COL2"));
+    assertEquals("DATA_COL3", 1.0f, record1.get("DATA_COL3"));
+    assertEquals("DATA_COL4", 2.0, record1.get("DATA_COL4"));
+    assertEquals("DATA_COL5", "s", record1.get("DATA_COL5"));
+    Object object = record1.get("DATA_COL6");
+    assertTrue(object instanceof ByteBuffer);
+    ByteBuffer b = ((ByteBuffer) object);
+    assertEquals((byte) 1, b.get(0));
+    assertEquals((byte) 2, b.get(1));
+    assertEquals(1, genericRecords.size());
   }
 
   @Test
@@ -181,15 +185,10 @@ public class TestParquetImport extends ImportJobTestCase {
     assertEquals(types.length, fields.size());
     checkField(fields.get(0), "DATA_COL0", Type.STRING);
 
-    DatasetReader<GenericRecord> reader = getReader();
-    try {
-      assertTrue(reader.hasNext());
-      GenericRecord record1 = reader.next();
-      assertEquals("DATA_COL0", "10", record1.get("DATA_COL0"));
-      assertFalse(reader.hasNext());
-    } finally {
-      reader.close();
-    }
+    List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
+    GenericRecord record1 = genericRecords.get(0);
+    assertEquals("DATA_COL0", "10", record1.get("DATA_COL0"));
+    assertEquals(1, genericRecords.size());
   }
 
   @Test
@@ -207,15 +206,10 @@ public class TestParquetImport extends ImportJobTestCase {
     assertEquals(types.length, fields.size());
     checkField(fields.get(0), "__NAME", Type.INT);
 
-    DatasetReader<GenericRecord> reader = getReader();
-    try {
-      assertTrue(reader.hasNext());
-      GenericRecord record1 = reader.next();
-      assertEquals("__NAME", 1987, record1.get("__NAME"));
-      assertFalse(reader.hasNext());
-    } finally {
-      reader.close();
-    }
+    List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
+    GenericRecord record1 = genericRecords.get(0);
+    assertEquals("__NAME", 1987, record1.get("__NAME"));
+    assertEquals(1, genericRecords.size());
   }
 
   @Test
@@ -233,15 +227,10 @@ public class TestParquetImport extends ImportJobTestCase {
     assertEquals(types.length, fields.size());
     checkField(fields.get(0), "TEST_P_A_R_QUET", Type.INT);
 
-    DatasetReader<GenericRecord> reader = getReader();
-    try {
-      assertTrue(reader.hasNext());
-      GenericRecord record1 = reader.next();
-      assertEquals("TEST_P_A_R_QUET", 2015, record1.get("TEST_P_A_R_QUET"));
-      assertFalse(reader.hasNext());
-    } finally {
-      reader.close();
-    }
+    List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
+    GenericRecord record1 = genericRecords.get(0);
+    assertEquals("TEST_P_A_R_QUET", 2015, record1.get("TEST_P_A_R_QUET"));
+    assertEquals(1, genericRecords.size());
   }
 
   @Test
@@ -252,15 +241,10 @@ public class TestParquetImport extends ImportJobTestCase {
 
     runImport(getOutputArgv(true, null));
 
-    DatasetReader<GenericRecord> reader = getReader();
-    try {
-      assertTrue(reader.hasNext());
-      GenericRecord record1 = reader.next();
-      assertNull(record1.get("DATA_COL0"));
-      assertFalse(reader.hasNext());
-    } finally {
-      reader.close();
-    }
+    List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
+    GenericRecord record1 = genericRecords.get(0);
+    assertNull(record1.get("DATA_COL0"));
+    assertEquals(1, genericRecords.size());
   }
 
   @Test
@@ -271,15 +255,10 @@ public class TestParquetImport extends ImportJobTestCase {
 
     runImport(getOutputQueryArgv(true, null));
 
-    DatasetReader<GenericRecord> reader = getReader();
-    try {
-      assertTrue(reader.hasNext());
-      GenericRecord record1 = reader.next();
-      assertEquals(1, record1.get("DATA_COL0"));
-      assertFalse(reader.hasNext());
-    } finally {
-      reader.close();
-    }
+    List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
+    GenericRecord record1 = genericRecords.get(0);
+    assertEquals(1, record1.get("DATA_COL0"));
+    assertEquals(1, genericRecords.size());
   }
 
   @Test
@@ -291,17 +270,12 @@ public class TestParquetImport extends ImportJobTestCase {
     runImport(getOutputArgv(true, null));
     runImport(getOutputArgv(true, new String[]{"--append"}));
 
-    DatasetReader<GenericRecord> reader = getReader();
-    try {
-      assertTrue(reader.hasNext());
-      GenericRecord record1 = reader.next();
-      assertEquals(1, record1.get("DATA_COL0"));
-      record1 = reader.next();
-      assertEquals(1, record1.get("DATA_COL0"));
-      assertFalse(reader.hasNext());
-    } finally {
-      reader.close();
-    }
+    List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
+    GenericRecord record1 = genericRecords.get(0);
+    assertEquals(1, record1.get("DATA_COL0"));
+    record1 = genericRecords.get(1);
+    assertEquals(1, record1.get("DATA_COL0"));
+    assertEquals(2, genericRecords.size());
   }
 
   @Test
@@ -319,30 +293,26 @@ public class TestParquetImport extends ImportJobTestCase {
     }
   }
 
-  private CompressionType getCompressionType() {
-    return getDataset().getDescriptor().getCompressionType();
-  }
-
-  private Schema getSchema() {
-    return getDataset().getDescriptor().getSchema();
-  }
-
-  private DatasetReader<GenericRecord> getReader() {
-    return getDataset().newReader();
+  private String getCompressionType() {
+    ParquetMetadata parquetMetadata = getOutputMetadata();
+    CompressionCodec parquetCompressionCodec = parquetMetadata.getBlocks().get(0).getColumns().get(0).getCodec().getParquetCompressionCodec();
+    return parquetCompressionCodec.name();
   }
 
-  private Dataset<GenericRecord> getDataset() {
-    String uri = "dataset:file:" + getTablePath();
-    return Datasets.load(uri, GenericRecord.class);
+  private ParquetMetadata getOutputMetadata() {
+    try {
+      Configuration config = new Configuration();
+      FileStatus fileStatus = getTablePath().getFileSystem(config).getFileStatus(getTablePath());
+      List<Footer> footers = ParquetFileReader.readFooters(config, fileStatus, false);
+      return footers.get(0).getParquetMetadata();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
-  @Override
-  public void tearDown() {
-    super.tearDown();
-    String uri = "dataset:file:" + getTablePath();
-    if (Datasets.exists(uri)) {
-      Datasets.delete(uri);
-    }
+  private Schema getSchema() {
+    String schemaString = getOutputMetadata().getFileMetaData().getKeyValueMetaData().get("parquet.avro.schema");
+    return new Schema.Parser().parse(schemaString);
   }
 
   private void checkField(Field field, String name, Type type) {
index 4e1f249..bc19b69 100644 (file)
@@ -23,14 +23,12 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 import org.apache.sqoop.Sqoop;
 
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -38,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.sqoop.avro.AvroSchemaMismatchException;
 import org.apache.sqoop.mapreduce.ParquetJob;
+import org.apache.sqoop.util.ParquetReader;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -54,11 +53,8 @@ import org.apache.sqoop.tool.ImportTool;
 import org.apache.sqoop.tool.SqoopTool;
 import org.apache.commons.cli.ParseException;
 import org.junit.rules.ExpectedException;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetReader;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.Formats;
 
+import static java.util.Collections.sort;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -301,43 +297,39 @@ public class TestHiveImport extends ImportJobTestCase {
 
     runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs),
         new ImportTool());
-    verifyHiveDataset(TABLE_NAME, new Object[][]{{"test", 42, "somestring"}});
+    verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}});
   }
 
-  private void verifyHiveDataset(String tableName, Object[][] valsArray) {
-    String datasetUri = String.format("dataset:hive:default/%s",
-        tableName.toLowerCase());
-    assertTrue(Datasets.exists(datasetUri));
-    Dataset dataset = Datasets.load(datasetUri);
-    assertFalse(dataset.isEmpty());
+  private void verifyHiveDataset(Object[][] valsArray) {
+    List<String> expected = getExpectedLines(valsArray);
+    List<String> result = new ParquetReader(getTablePath()).readAllInCsv();
 
-    DatasetReader<GenericRecord> reader = dataset.newReader();
-    try {
-      List<String> expectations = new ArrayList<String>();
-      if (valsArray != null) {
-        for (Object[] vals : valsArray) {
-          expectations.add(Arrays.toString(vals));
-        }
-      }
+    sort(expected);
+    sort(result);
+
+    assertEquals(expected, result);
+  }
 
-      while (reader.hasNext() && expectations.size() > 0) {
-        String actual = Arrays.toString(
-            convertGenericRecordToArray(reader.next()));
-        assertTrue("Expect record: " + actual, expectations.remove(actual));
+  private List<String> getExpectedLines(Object[][] valsArray) {
+    List<String> expectations = new ArrayList<>();
+    if (valsArray != null) {
+      for (Object[] vals : valsArray) {
+        expectations.add(toCsv(vals));
       }
-      assertFalse(reader.hasNext());
-      assertEquals(0, expectations.size());
-    } finally {
-      reader.close();
     }
+    return expectations;
   }
 
-  private static Object[] convertGenericRecordToArray(GenericRecord record) {
-    Object[] result = new Object[record.getSchema().getFields().size()];
-    for (int i = 0; i < result.length; i++) {
-      result[i] = record.get(i);
+  private String toCsv(Object[] vals) {
+    StringBuilder result = new StringBuilder();
+
+    for (Object val : vals) {
+      result.append(val).append(",");
     }
-    return result;
+
+    result.deleteCharAt(result.length() - 1);
+
+    return result.toString();
   }
 
   /** Test that table is created in hive with no data import. */
@@ -388,13 +380,13 @@ public class TestHiveImport extends ImportJobTestCase {
     ImportTool tool = new ImportTool();
 
     runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs), tool);
-    verifyHiveDataset(TABLE_NAME, new Object[][]{{"test", 42, "somestring"}});
+    verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}});
 
     String [] valsToOverwrite = { "'test2'", "24", "'somestring2'" };
     String [] extraArgsForOverwrite = {"--as-parquetfile", "--hive-overwrite"};
     runImportTest(TABLE_NAME, types, valsToOverwrite, "",
         getArgv(false, extraArgsForOverwrite), tool);
-    verifyHiveDataset(TABLE_NAME, new Object[][] {{"test2", 24, "somestring2"}});
+    verifyHiveDataset(new Object[][] {{"test2", 24, "somestring2"}});
   }
 
   @Test
@@ -430,7 +422,7 @@ public class TestHiveImport extends ImportJobTestCase {
             .name(getColName(2)).type().nullable().stringType().noDefault()
             .endRecord();
     String dataSetUri = "dataset:hive:/default/" + tableName;
-    ParquetJob.createDataset(dataSetSchema, Formats.PARQUET.getDefaultCompressionType(), dataSetUri);
+    ParquetJob.createDataset(dataSetSchema, ParquetJob.getCompressionType(new Configuration()), dataSetUri);
   }
 
   /**
@@ -448,11 +440,11 @@ public class TestHiveImport extends ImportJobTestCase {
     ImportTool tool = new ImportTool();
 
     runImportTest(TABLE_NAME, types, vals, "", args, tool);
-    verifyHiveDataset(TABLE_NAME, new Object[][]{{"test", 42, "somestring"}});
+    verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}});
 
     String [] valsToAppend = { "'test2'", "4242", "'somestring2'" };
     runImportTest(TABLE_NAME, types, valsToAppend, "", args, tool);
-    verifyHiveDataset(TABLE_NAME, new Object[][] {
+    verifyHiveDataset(new Object[][] {
         {"test2", 4242, "somestring2"}, {"test", 42, "somestring"}});
   }
 
diff --git a/src/test/org/apache/sqoop/util/ParquetReader.java b/src/test/org/apache/sqoop/util/ParquetReader.java
new file mode 100644 (file)
index 0000000..56e03a0
--- /dev/null
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.util;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import parquet.avro.AvroParquetReader;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+
+import static org.apache.sqoop.util.FileSystemUtil.isFile;
+import static org.apache.sqoop.util.FileSystemUtil.listFiles;
+
+public class ParquetReader implements AutoCloseable {
+
+  private final Path pathToRead;
+
+  private final Configuration configuration;
+
+  private final Deque<Path> filesToRead;
+
+  private parquet.hadoop.ParquetReader<GenericRecord> reader;
+
+  public ParquetReader(Path pathToRead, Configuration configuration) {
+    this.pathToRead = pathToRead;
+    this.configuration = configuration;
+    this.filesToRead = new ArrayDeque<>(determineFilesToRead());
+    initReader(filesToRead.removeFirst());
+  }
+
+  public ParquetReader(Path pathToRead) {
+    this(pathToRead, new Configuration());
+  }
+
+  public GenericRecord next() throws IOException {
+    GenericRecord result = reader.read();
+    if (result != null) {
+      return result;
+    }
+    if (!filesToRead.isEmpty()) {
+      initReader(filesToRead.removeFirst());
+      return next();
+    }
+
+    return null;
+  }
+
+  public List<GenericRecord> readAll() {
+    List<GenericRecord> result = new ArrayList<>();
+
+    GenericRecord record;
+    try {
+      while ((record = next()) != null) {
+        result.add(record);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } finally {
+      close();
+    }
+
+    return result;
+  }
+
+  public List<String> readAllInCsv() {
+    List<String> result = new ArrayList<>();
+
+    for (GenericRecord record : readAll()) {
+      result.add(convertToCsv(record));
+    }
+
+    return result;
+  }
+
+  private String convertToCsv(GenericRecord record) {
+    StringBuilder result = new StringBuilder();
+    for (int i = 0; i < record.getSchema().getFields().size(); i++) {
+      result.append(record.get(i));
+      result.append(",");
+    }
+    result.deleteCharAt(result.length() - 1);
+    return result.toString();
+  }
+
+  private void initReader(Path file) {
+    try {
+      if (reader != null) {
+        reader.close();
+      }
+      this.reader = AvroParquetReader.<GenericRecord>builder(file).build();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Collection<Path> determineFilesToRead() {
+    try {
+      if (isFile(pathToRead, configuration)) {
+        return Collections.singletonList(pathToRead);
+      }
+
+      return listFiles(pathToRead, configuration);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (reader != null) {
+      try {
+        reader.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}