SQOOP-3224: Mainframe FTP transfer should have an option to use binary mode for transfer
authorSzabolcs Vasas <vasas@apache.org>
Thu, 23 Aug 2018 15:08:00 +0000 (17:08 +0200)
committerSzabolcs Vasas <vasas@apache.org>
Thu, 23 Aug 2018 15:08:00 +0000 (17:08 +0200)
(Chris Teoh via Szabolcs Vasas)

21 files changed:
build.xml
src/docs/user/import-mainframe.txt
src/java/org/apache/sqoop/SqoopOptions.java
src/java/org/apache/sqoop/mapreduce/ByteKeyOutputFormat.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/KeyRecordWriters.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/RawKeyTextOutputFormat.java
src/java/org/apache/sqoop/mapreduce/mainframe/AbstractMainframeDatasetImportMapper.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java
src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryImportMapper.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryRecord.java [new file with mode: 0644]
src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java
src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetImportMapper.java
src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
src/java/org/apache/sqoop/tool/BaseSqoopTool.java
src/java/org/apache/sqoop/tool/ImportTool.java
src/java/org/apache/sqoop/tool/MainframeImportTool.java
src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
src/test/org/apache/sqoop/manager/mainframe/MainframeManagerImportTest.java
src/test/org/apache/sqoop/manager/mainframe/MainframeTestUtil.java
src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetBinaryRecord.java [new file with mode: 0644]
src/test/org/apache/sqoop/tool/TestMainframeImportTool.java

index 084823c..cd2e9e2 100644 (file)
--- a/build.xml
+++ b/build.xml
   <property name="sqoop.test.mainframe.ftp.dataset.gdg" value="TSODIQ1.GDGTEXT" />
   <property name="sqoop.test.mainframe.ftp.dataset.gdg.filename" value="G0001V43" />
   <property name="sqoop.test.mainframe.ftp.dataset.gdg.md5" value="f0d0d171fdb8a03dbc1266ed179d7093" />
+  <property name="sqoop.test.mainframe.ftp.binary.dataset.gdg" value="TSODIQ1.FOLDER" />
+  <property name="sqoop.test.mainframe.ftp.binary.dataset.gdg.filename" value="G0002V45" />
+  <property name="sqoop.test.mainframe.ftp.binary.dataset.gdg.md5" value="43eefbe34e466dd3f65a3e867a60809a" />
+  <property name="sqoop.test.mainframe.ftp.dataset.seq" value="TSODIQ1.GDGTEXT.G0001V43" />
+  <property name="sqoop.test.mainframe.ftp.dataset.seq.filename" value="G0001V43" />
+  <property name="sqoop.test.mainframe.ftp.dataset.seq.md5" value="f0d0d171fdb8a03dbc1266ed179d7093" />
+  <property name="sqoop.test.mainframe.ftp.binary.dataset.seq" value="TSODIQ1.FOLDER.FOLDERTXT" />
+  <property name="sqoop.test.mainframe.ftp.binary.dataset.seq.filename" value="FOLDERTXT" />
+  <property name="sqoop.test.mainframe.ftp.binary.dataset.seq.md5" value="1591c0fcc718fda7e9c1f3561d232b2b" />
 
   <property name="s3.bucket.url" value="" />
   <property name="s3.generator.command" value="" />
       <sysproperty key="sqoop.test.mainframe.ftp.dataset.gdg" value="${sqoop.test.mainframe.ftp.dataset.gdg}" />
       <sysproperty key="sqoop.test.mainframe.ftp.dataset.gdg.filename" value="${sqoop.test.mainframe.ftp.dataset.gdg.filename}" />
       <sysproperty key="sqoop.test.mainframe.ftp.dataset.gdg.md5" value="${sqoop.test.mainframe.ftp.dataset.gdg.md5}" />
+      <sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.gdg" value="${sqoop.test.mainframe.ftp.binary.dataset.gdg}" />
+      <sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.gdg.filename" value="${sqoop.test.mainframe.ftp.binary.dataset.gdg.filename}" />
+      <sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.gdg.md5" value="${sqoop.test.mainframe.ftp.binary.dataset.gdg.md5}" />
 
       <sysproperty key="s3.bucket.url" value="${s3.bucket.url}" />
       <sysproperty key="s3.generator.command" value="${s3.generator.command}" />
 
+      <sysproperty key="sqoop.test.mainframe.ftp.dataset.seq" value="${sqoop.test.mainframe.ftp.dataset.seq}" />
+      <sysproperty key="sqoop.test.mainframe.ftp.dataset.seq.filename" value="${sqoop.test.mainframe.ftp.dataset.seq.filename}" />
+      <sysproperty key="sqoop.test.mainframe.ftp.dataset.seq.md5" value="${sqoop.test.mainframe.ftp.dataset.seq.md5}" />
+      <sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.seq" value="${sqoop.test.mainframe.ftp.binary.dataset.seq}" />
+      <sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.seq.filename" value="${sqoop.test.mainframe.ftp.binary.dataset.seq.filename}" />
+      <sysproperty key="sqoop.test.mainframe.ftp.binary.dataset.seq.md5" value="${sqoop.test.mainframe.ftp.binary.dataset.seq.md5}" />
       <!-- Location of Hive logs -->
       <!--<sysproperty key="hive.log.dir"
                    value="${test.build.data}/sqoop/logs"/> -->
index abeb7cd..3ecfb7e 100644 (file)
@@ -49,6 +49,7 @@ Argument                          Description
 +\--as-sequencefile+              Imports data to SequenceFiles
 +\--as-textfile+                  Imports data as plain text (default)
 +\--as-parquetfile+               Imports data to Parquet Files
++\--as-binaryfile+                Imports data as binary files
 +\--delete-target-dir+            Delete the import target directory\
                                   if it exists
 +-m,\--num-mappers <n>+           Use 'n' map tasks to import in parallel
@@ -193,6 +194,26 @@ $ sqoop import-mainframe --dataset SomePDS --jar-file mydatatypes.jar \
 
 This command will load the +SomePDSType+ class out of +mydatatypes.jar+.
 
+Support for Generation Data Group and Sequential data sets.
+This can be specified with the --datasettype option followed by one of:
+'p' for partitioned dataset (default)
+'g' for generation data group dataset
+'s' for sequential dataset
+
+In the case of generation data group datasets, Sqoop will retrieve just the last or
+latest file (or generation).
+
+In the case of sequential datasets, Sqoop will retrieve just the file specified.
+
+Support of datasets that are stored on tape volumes by specifying --tape true.
+
+By default, mainframe datasets are assumed to be plain text. Attempting to transfer
+binary datasets using this method will result in data corruption.
+Support for binary datasets by specifying --as-binaryfile and optionally --buffersize followed by
+buffer size specified in bytes. By default, --buffersize is set to 32760 bytes. Altering buffersize
+will alter the number of records Sqoop reports to have imported. This is because it reads the
+binary dataset in chunks specified by buffersize. Larger buffer size means lower number of records.
+
 Additional Import Configuration Properties
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 There are some additional properties which can be configured by modifying
@@ -228,6 +249,23 @@ $ sqoop import-mainframe --connect z390 --dataset EMPLOYEES \
 Enter password: (hidden)
 ----
 
+Import of a tape based generation data group dataset using a password alias and writing out to
+an intermediate directory (--outdir) before moving it to (--target-dir).
+----
+$ sqoop import-mainframe --dataset SomeGdg --connect <host> --username myuser --password-alias \
+    mypasswordalias --datasettype g --tape true --outdir /tmp/imported/sqoop \
+    --target-dir /data/imported/mainframe/SomeGdg
+----
+
+Import of a tape based binary generation data group dataset with a buffer size of 64000 using a
+password alias and writing out to an intermediate directory (--outdir) before moving it
+to (--target-dir).
+----
+$ sqoop import-mainframe --dataset SomeGdg --connect <host> --username myuser --password-alias \
+    mypasswordalias --datasettype g --tape true --as-binaryfile --buffersize 64000 --outdir /tmp/imported/sqoop \
+    --target-dir /data/imported/mainframe/SomeGdg
+----
+
 Controlling the import parallelism (using 8 parallel tasks):
 
 ----
index f97dbfd..f06872f 100644 (file)
@@ -87,7 +87,8 @@ public class SqoopOptions implements Cloneable {
     TextFile,
     SequenceFile,
     AvroDataFile,
-    ParquetFile
+    ParquetFile,
+    BinaryFile
   }
 
   /**
@@ -362,7 +363,12 @@ public class SqoopOptions implements Cloneable {
   // Indicates if the data set is on tape to use different FTP parser
   @StoredAsProperty("mainframe.input.dataset.tape")
   private String mainframeInputDatasetTape;
-
+  // Indicates if binary or ascii FTP transfer mode should be used
+  @StoredAsProperty("mainframe.ftp.transfermode")
+  private String mainframeFtpTransferMode;
+  // Buffer size to use when using binary FTP transfer mode
+  @StoredAsProperty("mainframe.ftp.buffersize")
+  private Integer bufferSize;
   // Accumulo home directory
   private String accumuloHome; // not serialized to metastore.
   // Zookeeper home directory
@@ -1162,6 +1168,11 @@ public class SqoopOptions implements Cloneable {
     this.escapeColumnMappingEnabled = true;
 
     this.parquetConfiguratorImplementation = HADOOP;
+
+    // set default transfer mode to ascii
+    this.mainframeFtpTransferMode = MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_ASCII;
+    // set default buffer size for mainframe binary transfers
+    this.bufferSize = MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE;
   }
 
   /**
@@ -2499,6 +2510,23 @@ public class SqoopOptions implements Cloneable {
   public void setMainframeInputDatasetTape(String txtIsFromTape) {
          mainframeInputDatasetTape = Boolean.valueOf(Boolean.parseBoolean(txtIsFromTape)).toString();
   }
+  // returns the buffer size set.
+  public Integer getBufferSize() {
+    return bufferSize;
+  }
+
+  public void setMainframeFtpTransferMode(String transferMode) {
+    mainframeFtpTransferMode = transferMode;
+  }
+
+  public String getMainframeFtpTransferMode() {
+    return mainframeFtpTransferMode;
+  }
+
+  // sets the binary transfer buffer size, defaults to MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE
+  public void setBufferSize(int buf) {
+    bufferSize = buf;
+  }
 
   public static String getAccumuloHomeDefault() {
     // Set this with $ACCUMULO_HOME, but -Daccumulo.home can override.
diff --git a/src/java/org/apache/sqoop/mapreduce/ByteKeyOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/ByteKeyOutputFormat.java
new file mode 100644 (file)
index 0000000..f7427db
--- /dev/null
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * An {@link OutputFormat} that writes binary files.
+ * Only writes the key. Does not write any delimiter/newline after the key.
+ */
+public class ByteKeyOutputFormat<K, V> extends RawKeyTextOutputFormat<K, V> {
+
+  // currently don't support compression
+  private static final String FILE_EXTENSION = "";
+
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    DataOutputStream ostream = getFSDataOutputStream(context,FILE_EXTENSION);
+    return new KeyRecordWriters.BinaryKeyRecordWriter<K,V>(ostream);
+  }
+
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/KeyRecordWriters.java b/src/java/org/apache/sqoop/mapreduce/KeyRecordWriters.java
new file mode 100644 (file)
index 0000000..9630a81
--- /dev/null
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class KeyRecordWriters {
+  /**
+   * RecordWriter to write to plain text files.
+   */
+
+  public static class GenericRecordWriter<K, V> extends RecordWriter<K, V> {
+    private static final String UTF8 = "UTF-8";
+
+    protected DataOutputStream out;
+
+    /**
+     * Write the object to the byte stream, handling Text as a special
+     * case.
+     *
+     * @param o the object to print
+     * @param value the corresponding value for key o
+     * @throws IOException if the write throws, we pass it on
+     */
+    protected void writeObject(Object o,Object value) throws IOException {
+      if (o instanceof Text) {
+        Text to = (Text) o;
+        out.write(to.getBytes(), 0, to.getLength());
+      } else {
+        out.write(o.toString().getBytes(UTF8));
+      }
+    }
+
+    @Override
+    public synchronized void write(K key, V value) throws IOException, InterruptedException {
+      writeObject(key,value);
+    }
+
+    @Override
+    public synchronized void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+        out.close();
+    }
+  }
+
+  public static class RawKeyRecordWriter<K, V> extends GenericRecordWriter<K, V> {
+
+    public RawKeyRecordWriter(DataOutputStream out) {
+      this.out = out;
+    }
+  }
+
+  /**
+   * RecordWriter to write to plain text files.
+   */
+  public static class BinaryKeyRecordWriter<K, V> extends GenericRecordWriter<K, V> {
+
+    public BinaryKeyRecordWriter(DataOutputStream out) {
+      this.out = out;
+    }
+
+    /**
+     * Write the object to the byte stream, handling Text as a special
+     * case.
+     * @param o the object to print
+     * @throws IOException if the write throws, we pass it on
+     */
+    @Override
+    protected void writeObject(Object o, Object value) throws IOException {
+      if (o instanceof BytesWritable) {
+        BytesWritable to = (BytesWritable) o;
+        out.write(to.getBytes(), 0, to.getLength());
+      }
+    }
+  }
+}
index fec34f2..8e81aa4 100644 (file)
@@ -24,7 +24,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -38,47 +37,15 @@ import org.apache.hadoop.util.*;
  */
 public class RawKeyTextOutputFormat<K, V> extends FileOutputFormat<K, V> {
 
-  /**
-   * RecordWriter to write to plain text files.
-   */
-  public static class RawKeyRecordWriter<K, V> extends RecordWriter<K, V> {
-
-    private static final String UTF8 = "UTF-8";
-
-    protected DataOutputStream out;
-
-    public RawKeyRecordWriter(DataOutputStream out) {
-      this.out = out;
-    }
-
-    /**
-     * Write the object to the byte stream, handling Text as a special
-     * case.
-     * @param o the object to print
-     * @throws IOException if the write throws, we pass it on
-     */
-    private void writeObject(Object o) throws IOException {
-      if (o instanceof Text) {
-        Text to = (Text) o;
-        out.write(to.getBytes(), 0, to.getLength());
-      } else {
-        out.write(o.toString().getBytes(UTF8));
-      }
-    }
-
-    public synchronized void write(K key, V value) throws IOException {
-      writeObject(key);
-    }
-
-    public synchronized void close(TaskAttemptContext context)
-        throws IOException {
-      out.close();
-    }
-
+  protected FSDataOutputStream getFSDataOutputStream(TaskAttemptContext context, String ext) throws IOException {
+    Configuration conf = context.getConfiguration();
+    Path file = getDefaultWorkFile(context, ext);
+    FileSystem fs = file.getFileSystem(conf);
+    FSDataOutputStream fileOut = fs.create(file, false);
+    return fileOut;
   }
 
-  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
-      throws IOException {
+  protected DataOutputStream getOutputStream(TaskAttemptContext context) throws IOException {
     boolean isCompressed = getCompressOutput(context);
     Configuration conf = context.getConfiguration();
     String ext = "";
@@ -93,17 +60,18 @@ public class RawKeyTextOutputFormat<K, V> extends FileOutputFormat<K, V> {
       ext = codec.getDefaultExtension();
     }
 
-    Path file = getDefaultWorkFile(context, ext);
-    FileSystem fs = file.getFileSystem(conf);
-    FSDataOutputStream fileOut = fs.create(file, false);
+    FSDataOutputStream fileOut = getFSDataOutputStream(context,ext);
     DataOutputStream ostream = fileOut;
 
     if (isCompressed) {
       ostream = new DataOutputStream(codec.createOutputStream(fileOut));
     }
-
-    return new RawKeyRecordWriter<K, V>(ostream);
+    return ostream;
   }
 
-}
-
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    DataOutputStream ostream = getOutputStream(context);
+    return new KeyRecordWriters.RawKeyRecordWriter<K, V>(ostream);
+  }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/AbstractMainframeDatasetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/mainframe/AbstractMainframeDatasetImportMapper.java
new file mode 100644 (file)
index 0000000..9304fe2
--- /dev/null
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.mainframe;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.sqoop.config.ConfigurationConstants;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.mapreduce.AutoProgressMapper;
+
+import java.io.IOException;
+
+public abstract class AbstractMainframeDatasetImportMapper<KEY>
+  extends AutoProgressMapper<LongWritable, SqoopRecord, KEY, NullWritable> {
+
+  private MainframeDatasetInputSplit inputSplit;
+  private MultipleOutputs<KEY, NullWritable> multiFileWriter;
+  private long numberOfRecords;
+
+  public void map(LongWritable key,  SqoopRecord val, Context context)
+    throws IOException, InterruptedException {
+    String dataset = inputSplit.getCurrentDataset();
+    numberOfRecords++;
+    multiFileWriter.write(createOutKey(val), NullWritable.get(), dataset);
+  }
+
+  @Override
+  protected void setup(Context context)
+    throws IOException, InterruptedException {
+    super.setup(context);
+    inputSplit = (MainframeDatasetInputSplit)context.getInputSplit();
+    multiFileWriter = new MultipleOutputs<>(context);
+    numberOfRecords = 0;
+  }
+
+  @Override
+  protected void cleanup(Context context)
+    throws IOException, InterruptedException {
+    super.cleanup(context);
+    multiFileWriter.close();
+    context.getCounter(
+      ConfigurationConstants.COUNTER_GROUP_MAPRED_TASK_COUNTERS,
+      ConfigurationConstants.COUNTER_MAP_OUTPUT_RECORDS)
+      .increment(numberOfRecords);
+  }
+
+  protected abstract KEY createOutKey(SqoopRecord sqoopRecord);
+}
\ No newline at end of file
index ea54b07..9d6a2fe 100644 (file)
@@ -33,4 +33,15 @@ public class MainframeConfiguration
   public static final String MAINFRAME_INPUT_DATASET_TAPE = "mainframe.input.dataset.tape";
 
   public static final String MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME = "org.apache.sqoop.mapreduce.mainframe.MainframeFTPFileEntryParser";
+
+  public static final String MAINFRAME_FTP_TRANSFER_MODE = "mainframe.ftp.transfermode";
+
+  public static final String MAINFRAME_FTP_TRANSFER_MODE_ASCII = "ascii";
+
+  public static final String MAINFRAME_FTP_TRANSFER_MODE_BINARY = "binary";
+
+  // this is the default buffer size used when doing binary ftp transfers from mainframe
+  public static final Integer MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE = 32760;
+
+  public static final String MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE = "mainframe.ftp.buffersize";
 }
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryImportMapper.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryImportMapper.java
new file mode 100644 (file)
index 0000000..b2417b9
--- /dev/null
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.mainframe;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.sqoop.lib.SqoopRecord;
+
+/**
+ * Mapper that writes mainframe dataset records in binary format to multiple files
+ * based on the key, which is the index of the datasets in the input split.
+ */
+public class MainframeDatasetBinaryImportMapper extends AbstractMainframeDatasetImportMapper<BytesWritable> {
+
+  @Override
+  protected BytesWritable createOutKey(SqoopRecord sqoopRecord) {
+    BytesWritable result = new BytesWritable();
+    byte[] bytes = (byte[]) sqoopRecord.getFieldMap().entrySet().iterator().next().getValue();
+    result.set(bytes,0, bytes.length);
+    return result;
+  }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryRecord.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetBinaryRecord.java
new file mode 100644 (file)
index 0000000..6e82798
--- /dev/null
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.mainframe;
+
+import org.apache.sqoop.lib.DelimiterSet;
+import org.apache.sqoop.lib.LargeObjectLoader;
+import org.apache.hadoop.io.Text;
+import org.apache.sqoop.lib.SqoopRecord;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MainframeDatasetBinaryRecord extends SqoopRecord {
+
+  private byte[] field;
+
+  public Map<String, Object> getFieldMap() {
+    Map<String, Object> map = new HashMap<String, Object>();
+    map.put("fieldName", field);
+    return map;
+  }
+
+  public void setField(String fieldName, Object fieldVal) {
+    if (fieldVal instanceof byte[]) {
+      field = (byte[]) fieldVal;
+    }
+  }
+
+  public void setField(final byte[] val) {
+    this.field = val;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    in.readFully(field);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.write(field);
+  }
+
+  @Override
+  public void readFields(ResultSet rs) throws SQLException {
+    field = rs.getBytes(1);
+  }
+
+  @Override
+  public void write(PreparedStatement s) throws SQLException {
+    s.setBytes(1, field);
+  }
+
+  @Override
+  public String toString() {
+    return field.toString();
+  }
+
+  @Override
+  public int write(PreparedStatement stmt, int offset) throws SQLException {
+    return 0;
+  }
+
+  @Override
+  public String toString(DelimiterSet delimiters) {
+    return null;
+  }
+
+  @Override
+  public int getClassFormatVersion() {
+    return 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return field.hashCode();
+  }
+
+  public void parse(CharSequence s) {
+  }
+
+  public void parse(Text s) {
+  }
+
+  public void parse(byte[] s) {
+  }
+
+  public void parse(char[] s) {
+  }
+
+  public void parse(ByteBuffer s) {
+  }
+
+  public void parse(CharBuffer s) {
+  }
+
+  @Override
+  public void loadLargeObjects(LargeObjectLoader objLoader) throws SQLException, IOException, InterruptedException {
+
+  }
+}
\ No newline at end of file
index 1f78384..78c4665 100644 (file)
 
 package org.apache.sqoop.mapreduce.mainframe;
 
+import java.io.BufferedInputStream;
 import java.io.BufferedReader;
-import java.io.InputStreamReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,6 +40,7 @@ public class MainframeDatasetFTPRecordReader <T extends SqoopRecord>
     extends MainframeDatasetRecordReader<T> {
   private FTPClient ftp = null;
   private BufferedReader datasetReader = null;
+  private BufferedInputStream inputStream = null;
 
   private static final Log LOG = LogFactory.getLog(
       MainframeDatasetFTPRecordReader.class.getName());
@@ -50,21 +53,27 @@ public class MainframeDatasetFTPRecordReader <T extends SqoopRecord>
 
     Configuration conf = getConfiguration();
     ftp = MainframeFTPClientUtils.getFTPConnection(conf);
+    initialize(ftp,conf);
+  }
+
+  public void initialize(FTPClient ftpClient, Configuration conf)
+    throws IOException {
+    ftp = ftpClient;
     if (ftp != null) {
-               String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
-               String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
-               MainframeDatasetPath p = null;
-               try {
-                       p = new MainframeDatasetPath(dsName,conf);
-               } catch (Exception e) {
-                       LOG.error(e.getMessage());
-                       LOG.error("MainframeDatasetPath helper class incorrectly initialised");
-                       e.printStackTrace();
-               }
-               if (dsType != null && p != null) {
-                       dsName = p.getMainframeDatasetFolder();
-               }
-               ftp.changeWorkingDirectory("'" + dsName + "'");
+      String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
+      String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
+      MainframeDatasetPath p = null;
+      try {
+        p = new MainframeDatasetPath(dsName,conf);
+      } catch (Exception e) {
+        LOG.error(e.getMessage());
+        LOG.error("MainframeDatasetPath helper class incorrectly initialised");
+        e.printStackTrace();
+      }
+      if (dsType != null && p != null) {
+        dsName = p.getMainframeDatasetFolder();
+      }
+      ftp.changeWorkingDirectory("'" + dsName + "'");
     }
   }
 
@@ -80,6 +89,10 @@ public class MainframeDatasetFTPRecordReader <T extends SqoopRecord>
 
   protected boolean getNextRecord(T sqoopRecord) throws IOException {
     String line = null;
+    Configuration conf = getConfiguration();
+    if (MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY.equals(conf.get(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE))) {
+      return getNextBinaryRecord(sqoopRecord);
+    }
     try {
       do {
         if (datasetReader == null) {
@@ -112,9 +125,85 @@ public class MainframeDatasetFTPRecordReader <T extends SqoopRecord>
     return false;
   }
 
+  protected boolean getNextBinaryRecord(T sqoopRecord) throws IOException {
+    Configuration conf = getConfiguration();
+    // typical estimated max size for mainframe record
+    int BUFFER_SIZE = MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE;
+    if (conf != null) {
+      BUFFER_SIZE = conf.getInt(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE, MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE);
+    }
+    byte[] buf = new byte[BUFFER_SIZE];
+    int bytesRead = -1;
+    int cumulativeBytesRead = 0;
+    try {
+      Boolean streamInited = initInputStream(BUFFER_SIZE);
+      if (!streamInited) {
+        LOG.info("No more datasets to process.");
+        return false;
+      }
+      do {
+        bytesRead = inputStream.read(buf,cumulativeBytesRead,BUFFER_SIZE-cumulativeBytesRead);
+        if (bytesRead == -1) {
+          // EOF
+          closeFtpInputStream();
+          LOG.info("Data transfer completed.");
+          return writeBytesToSqoopRecord(buf,cumulativeBytesRead,sqoopRecord);
+        }
+        cumulativeBytesRead += bytesRead;
+        if (cumulativeBytesRead == BUFFER_SIZE) {
+          return writeBytesToSqoopRecord(buf,cumulativeBytesRead,sqoopRecord);
+        }
+      } while (bytesRead != -1);
+    } catch (IOException ioe) {
+      throw new IOException("IOException during data transfer: " + ioe);
+    }
+    return false;
+  }
+
+  protected Boolean initInputStream(int bufferSize) throws IOException {
+    if (inputStream == null) {
+      String dsName = getNextDataset();
+      if (dsName == null) {
+        LOG.info("No more datasets to process. Returning.");
+        return false;
+      }
+      LOG.info("Attempting to retrieve file stream for: "+dsName);
+      LOG.info("Buffer size: "+bufferSize);
+      inputStream = new BufferedInputStream(ftp.retrieveFileStream(dsName));
+      if (inputStream == null) {
+        throw new IOException("Failed to retrieve FTP file stream.");
+      }
+    }
+    return true;
+  }
+
+  protected void closeFtpInputStream() throws IOException {
+    inputStream.close();
+    inputStream = null;
+    if (!ftp.completePendingCommand()) {
+      throw new IOException("Failed to complete ftp command. FTP Response: "+ftp.getReplyString());
+    }
+  }
+
+  protected Boolean writeBytesToSqoopRecord(byte[] buf, int cumulativeBytesRead, SqoopRecord sqoopRecord) {
+    if (cumulativeBytesRead <= 0) {
+      return false;
+    }
+    ByteBuffer buffer = ByteBuffer.allocate(cumulativeBytesRead);
+    buffer.put(buf,0,cumulativeBytesRead);
+    convertToSqoopRecord(buffer.array(), sqoopRecord);
+    return true;
+  }
+
   private void convertToSqoopRecord(String line,  SqoopRecord sqoopRecord) {
     String fieldName
         = sqoopRecord.getFieldMap().entrySet().iterator().next().getKey();
     sqoopRecord.setField(fieldName, line);
   }
+
+  private void convertToSqoopRecord(byte[] buf,  SqoopRecord sqoopRecord) {
+    String fieldName
+      = sqoopRecord.getFieldMap().entrySet().iterator().next().getKey();
+    sqoopRecord.setField(fieldName, buf);
+  }
 }
index 0b7b5b8..0510e82 100644 (file)
 
 package org.apache.sqoop.mapreduce.mainframe;
 
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
-
-import org.apache.sqoop.config.ConfigurationConstants;
 import org.apache.sqoop.lib.SqoopRecord;
-import org.apache.sqoop.mapreduce.AutoProgressMapper;
 
 /**
  * Mapper that writes mainframe dataset records in Text format to multiple files
  * based on the key, which is the index of the datasets in the input split.
  */
-public class MainframeDatasetImportMapper
-    extends AutoProgressMapper<LongWritable, SqoopRecord, Text, NullWritable> {
-
-  private static final Log LOG = LogFactory.getLog(
-      MainframeDatasetImportMapper.class.getName());
-
-  private MainframeDatasetInputSplit inputSplit;
-  private MultipleOutputs<Text, NullWritable> mos;
-  private long numberOfRecords;
-  private Text outkey;
-
-  public void map(LongWritable key,  SqoopRecord val, Context context)
-      throws IOException, InterruptedException {
-    String dataset = inputSplit.getCurrentDataset();
-    outkey.set(val.toString());
-    numberOfRecords++;
-    mos.write(outkey, NullWritable.get(), dataset);
-  }
-
-  @Override
-  protected void setup(Context context)
-      throws IOException, InterruptedException {
-    super.setup(context);
-    inputSplit = (MainframeDatasetInputSplit)context.getInputSplit();
-    mos = new MultipleOutputs<Text, NullWritable>(context);
-    numberOfRecords = 0;
-    outkey = new Text();
-  }
+public class MainframeDatasetImportMapper extends AbstractMainframeDatasetImportMapper<Text> {
 
   @Override
-  protected void cleanup(Context context)
-      throws IOException, InterruptedException {
-    super.cleanup(context);
-    mos.close();
-    context.getCounter(
-        ConfigurationConstants.COUNTER_GROUP_MAPRED_TASK_COUNTERS,
-        ConfigurationConstants.COUNTER_MAP_OUTPUT_RECORDS)
-        .increment(numberOfRecords);
+  protected Text createOutKey(SqoopRecord sqoopRecord) {
+    Text result = new Text();
+    result.set(sqoopRecord.toString());
+    return result;
   }
 }
index 8ef30d3..90dc2dd 100644 (file)
@@ -22,14 +22,19 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
-
 import org.apache.sqoop.SqoopOptions;
 import org.apache.sqoop.manager.ImportJobContext;
-
+import org.apache.sqoop.mapreduce.DBWritable;
 import org.apache.sqoop.mapreduce.DataDrivenImportJob;
+import org.apache.sqoop.mapreduce.RawKeyTextOutputFormat;
+import org.apache.sqoop.mapreduce.ByteKeyOutputFormat;
 import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
 
 /**
@@ -46,7 +51,10 @@ public class MainframeImportJob extends DataDrivenImportJob {
 
   @Override
   protected Class<? extends Mapper> getMapperClass() {
-    if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
+    if (SqoopOptions.FileLayout.BinaryFile.equals(options.getFileLayout())) {
+      LOG.debug("Using MainframeDatasetBinaryImportMapper");
+      return MainframeDatasetBinaryImportMapper.class;
+    } else if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
       return MainframeDatasetImportMapper.class;
     } else {
       return super.getMapperClass();
@@ -66,13 +74,58 @@ public class MainframeImportJob extends DataDrivenImportJob {
     job.getConfiguration().set(
             MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE,
             options.getMainframeInputDatasetTape().toString());
+    if (SqoopOptions.FileLayout.BinaryFile == options.getFileLayout()) {
+      job.getConfiguration().set(
+        MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE,
+        MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY);
+      job.getConfiguration().setInt(
+        MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE,
+        options.getBufferSize()
+      );
+    } else {
+      job.getConfiguration().set(
+        MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE,
+        MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_ASCII);
+    }
+
   }
 
   @Override
   protected void configureOutputFormat(Job job, String tableName,
       String tableClassName) throws ClassNotFoundException, IOException {
     super.configureOutputFormat(job, tableName, tableClassName);
+    job.getConfiguration().set(
+      MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE,
+      options.getMainframeFtpTransferMode());
+    // use the default outputformat
     LazyOutputFormat.setOutputFormatClass(job, getOutputFormatClass());
   }
 
+  @Override
+  protected void configureMapper(Job job, String tableName,
+      String tableClassName) throws IOException {
+    super.configureMapper(job, tableName, tableClassName);
+    if (SqoopOptions.FileLayout.BinaryFile == options.getFileLayout()) {
+      job.setOutputKeyClass(BytesWritable.class);
+      job.setOutputValueClass(NullWritable.class);
+
+      // this is required as code generated class assumes setField method takes String
+      // and will fail with ClassCastException when a byte array is passed instead
+      // java.lang.ClassCastException: [B cannot be cast to java.lang.String
+      Configuration conf = job.getConfiguration();
+      conf.setClass(org.apache.sqoop.mapreduce.db.DBConfiguration.INPUT_CLASS_PROPERTY, MainframeDatasetBinaryRecord.class,
+        DBWritable.class);
+    }
+  }
+
+  @Override
+  protected Class<? extends OutputFormat> getOutputFormatClass() {
+    if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
+      return RawKeyTextOutputFormat.class;
+    } else if (options.getFileLayout()
+        == SqoopOptions.FileLayout.BinaryFile) {
+      return ByteKeyOutputFormat.class;
+    }
+    return null;
+  }
 }
index 9dcbdd5..b47be72 100644 (file)
@@ -114,6 +114,7 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool {
   public static final String FMT_TEXTFILE_ARG = "as-textfile";
   public static final String FMT_AVRODATAFILE_ARG = "as-avrodatafile";
   public static final String FMT_PARQUETFILE_ARG = "as-parquetfile";
+  public static final String FMT_BINARYFILE_ARG = "as-binaryfile";
   public static final String HIVE_IMPORT_ARG = "hive-import";
   public static final String HIVE_TABLE_ARG = "hive-table";
   public static final String HIVE_DATABASE_ARG = "hive-database";
index 478f174..1397337 100644 (file)
@@ -744,6 +744,10 @@ public class ImportTool extends BaseSqoopTool {
         .withDescription("Imports data to Parquet files")
         .withLongOpt(BaseSqoopTool.FMT_PARQUETFILE_ARG)
         .create());
+    importOpts.addOption(OptionBuilder
+      .withDescription("Imports data to Binary files")
+      .withLongOpt(BaseSqoopTool.FMT_BINARYFILE_ARG)
+      .create());
     importOpts.addOption(OptionBuilder.withArgName("n")
         .hasArg().withDescription("Use 'n' map tasks to import in parallel")
         .withLongOpt(NUM_MAPPERS_ARG)
index cdd9d6d..fbc8c3d 100644 (file)
@@ -20,6 +20,7 @@ package org.apache.sqoop.tool;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.ToolRunner;
@@ -40,6 +41,7 @@ public class MainframeImportTool extends ImportTool {
   public static final String DS_ARG = "dataset";
   public static final String DS_TYPE_ARG = "datasettype";
   public static final String DS_TAPE_ARG = "tape";
+  public static final String BUFFERSIZE_ARG = "buffersize";
 
   public MainframeImportTool() {
     super("import-mainframe", false);
@@ -82,6 +84,14 @@ public class MainframeImportTool extends ImportTool {
         .withDescription("Imports data as plain text (default)")
         .withLongOpt(FMT_TEXTFILE_ARG)
         .create());
+    importOpts.addOption(OptionBuilder
+      .withDescription("Imports data as binary")
+      .withLongOpt(FMT_BINARYFILE_ARG)
+      .create());
+    importOpts.addOption(OptionBuilder
+      .hasArg().withDescription("Sets buffer size for binary import in bytes (default=32kB)")
+      .withLongOpt(BUFFERSIZE_ARG)
+      .create());
     importOpts.addOption(OptionBuilder.withArgName("n")
         .hasArg().withDescription("Use 'n' map tasks to import in parallel")
         .withLongOpt(NUM_MAPPERS_ARG)
@@ -168,6 +178,28 @@ public class MainframeImportTool extends ImportTool {
        // set default tape value to false
        out.setMainframeInputDatasetTape("false");
     }
+    if (in.hasOption(FMT_BINARYFILE_ARG)) {
+      out.setMainframeFtpTransferMode(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY);
+      out.setFileLayout(SqoopOptions.FileLayout.BinaryFile);
+    } else {
+      // set default transfer mode to ascii
+      out.setMainframeFtpTransferMode(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_ASCII);
+      out.setFileLayout(SqoopOptions.FileLayout.TextFile);
+    }
+
+    if (in.hasOption(BUFFERSIZE_ARG)) {
+      // if we specify --buffersize set the buffer size
+      int bufSize = Integer.valueOf(in.getOptionValue(BUFFERSIZE_ARG));
+      if (bufSize > 0) {
+        out.setBufferSize(bufSize);
+      }
+      else {
+        out.setBufferSize(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE);
+      }
+    } else {
+      // set the default buffer size to 32kB
+      out.setBufferSize(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE);
+    }
   }
 
   @Override
@@ -190,6 +222,17 @@ public class MainframeImportTool extends ImportTool {
                throw new InvalidOptionsException(
                                "--" + DS_TAPE_ARG + " specified is invalid. " + HELP_STR);
        }
+    /* only allow FileLayout.BinaryFile to be selected for mainframe import */
+    if (SqoopOptions.FileLayout.BinaryFile.equals(options.getFileLayout()) && StringUtils.isEmpty(options.getMainframeInputDatasetName())) {
+      throw new InvalidOptionsException("--as-binaryfile should only be used with import-mainframe module.");
+    }
+
+    // only allow buffer size to be set different to default when binary file is selected
+    // in any case, if --as-binaryfile isn't selected, --buffersize parameter is harmless
+    if (!SqoopOptions.FileLayout.BinaryFile.equals(options.getFileLayout()) && !MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE.equals(options.getBufferSize())) {
+      throw new InvalidOptionsException("--buffersize should only be used with --as-binaryfile parameter.");
+    }
+
     super.validateImportOptions(options);
   }
 }
index 95bc0ec..654721e 100644 (file)
@@ -23,6 +23,7 @@ import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.net.PrintCommandListener;
 import org.apache.commons.net.ftp.FTP;
 import org.apache.commons.net.ftp.FTPClient;
@@ -207,8 +208,18 @@ public final class MainframeFTPClientUtils {
         throw new IOException("Could not login to server " + server
             + ":" + ftp.getReplyString());
       }
-      // set ASCII transfer mode
-      ftp.setFileType(FTP.ASCII_FILE_TYPE);
+      // set transfer mode
+      String transferMode = conf.get(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE);
+      if (StringUtils.equalsIgnoreCase(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY,transferMode)) {
+        LOG.info("Setting FTP transfer mode to binary");
+        // ftp.setFileTransferMode(FTP.BINARY_FILE_TYPE) doesn't work for MVS, it throws a syntax error
+        ftp.sendCommand("TYPE I");
+        // this is IMPORTANT - otherwise it will convert 0x0d0a to 0x0a = dropping bytes
+        ftp.setFileType(FTP.BINARY_FILE_TYPE);
+      } else {
+        LOG.info("Defaulting FTP transfer mode to ascii");
+        ftp.setFileTransferMode(FTP.ASCII_FILE_TYPE);
+      }
       // Use passive mode as default.
       ftp.enterLocalPassiveMode();
       LOG.info("System type detected: " + ftp.getSystemType());
index 041dfb7..3b8ed23 100644 (file)
@@ -113,6 +113,42 @@ public class MainframeManagerImportTest extends ImportJobTestCase {
     doImportAndVerify(MainframeTestUtil.GDG_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG, files);
   }
 
+  @Test
+  public void testImportGdgBinary() throws IOException {
+    HashMap<String,String> files = new HashMap<String,String>();
+    files.put(MainframeTestUtil.GDG_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_GDG_BINARY_DATASET_MD5);
+    doImportAndVerify(MainframeTestUtil.GDG_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG, files, "--as-binaryfile");
+  }
+
+  @Test
+  public void testImportGdgBinaryWithBufferSize() throws IOException {
+    HashMap<String,String> files = new HashMap<String,String>();
+    files.put(MainframeTestUtil.GDG_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_GDG_BINARY_DATASET_MD5);
+    doImportAndVerify(MainframeTestUtil.GDG_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG, files, "--as-binaryfile", "--buffersize", "64000");
+  }
+
+  @Test
+  public void testImportSequential() throws IOException {
+    // can reuse the same dataset as binary as the dataset is plain text
+    HashMap<String,String> files = new HashMap<String,String>();
+    files.put(MainframeTestUtil.SEQ_DATASET_FILENAME, MainframeTestUtil.EXPECTED_SEQ_DATASET_MD5);
+    doImportAndVerify(MainframeTestUtil.SEQ_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL, files);
+  }
+
+  @Test
+  public void testImportSequentialBinary() throws IOException {
+    HashMap<String,String> files = new HashMap<String,String>();
+    files.put(MainframeTestUtil.SEQ_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_SEQ_BINARY_DATASET_MD5);
+    doImportAndVerify(MainframeTestUtil.SEQ_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL, files, "--as-binaryfile");
+  }
+
+  @Test
+  public void testImportSequentialBinaryWithBufferSize() throws IOException {
+    HashMap<String,String> files = new HashMap<String,String>();
+    files.put(MainframeTestUtil.SEQ_BINARY_DATASET_FILENAME, MainframeTestUtil.EXPECTED_SEQ_BINARY_DATASET_MD5);
+    doImportAndVerify(MainframeTestUtil.SEQ_BINARY_DATASET_NAME, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL, files, "--as-binaryfile", "--buffersize", "64000");
+  }
+
   private String [] getArgv(String datasetName, String datasetType, String ... extraArgs) {
     ArrayList<String> args = new ArrayList<String>();
 
@@ -130,7 +166,6 @@ public class MainframeManagerImportTest extends ImportJobTestCase {
     args.add(datasetType);
 
     if (extraArgs.length > 0) {
-      args.add("--");
       for (String arg : extraArgs) {
         args.add(arg);
       }
index f28ff36..9f86f6c 100644 (file)
@@ -41,4 +41,31 @@ public class MainframeTestUtil {
   public static final String EXPECTED_GDG_DATASET_MD5 = System.getProperty(
       "sqoop.test.mainframe.ftp.dataset.gdg.md5",
       "f0d0d171fdb8a03dbc1266ed179d7093");
+  public static final String GDG_BINARY_DATASET_NAME = System.getProperty(
+      "sqoop.test.mainframe.ftp.binary.dataset.gdg",
+      "TSODIQ1.FOLDER");
+  public static final String GDG_BINARY_DATASET_FILENAME = System.getProperty(
+      "sqoop.test.mainframe.ftp.binary.dataset.gdg.filename",
+      "G0002V45");
+  public static final String EXPECTED_GDG_BINARY_DATASET_MD5 = System.getProperty(
+      "sqoop.test.mainframe.ftp.binary.dataset.gdg.md5",
+      "43eefbe34e466dd3f65a3e867a60809a");
+  public static final String SEQ_DATASET_NAME = System.getProperty(
+      "sqoop.test.mainframe.ftp.dataset.seq",
+      "TSODIQ1.GDGTEXT.G0001V43");
+  public static final String SEQ_DATASET_FILENAME = System.getProperty(
+      "sqoop.test.mainframe.ftp.dataset.seq.filename",
+      "G0001V43");
+  public static final String EXPECTED_SEQ_DATASET_MD5 = System.getProperty(
+      "sqoop.test.mainframe.ftp.dataset.seq.md5",
+      "f0d0d171fdb8a03dbc1266ed179d7093");
+  public static final String SEQ_BINARY_DATASET_NAME = System.getProperty(
+      "sqoop.test.mainframe.ftp.binary.dataset.seq",
+      "TSODIQ1.FOLDER.FOLDERTXT");
+  public static final String SEQ_BINARY_DATASET_FILENAME = System.getProperty(
+      "sqoop.test.mainframe.ftp.binary.dataset.seq.filename",
+      "FOLDERTXT");
+  public static final String EXPECTED_SEQ_BINARY_DATASET_MD5 = System.getProperty(
+      "sqoop.test.mainframe.ftp.binary.dataset.seq.md5",
+      "1591c0fcc718fda7e9c1f3561d232b2b");
 }
diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetBinaryRecord.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetBinaryRecord.java
new file mode 100644 (file)
index 0000000..b4cba28
--- /dev/null
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.mainframe;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.doReturn;
+
+public class TestMainframeDatasetBinaryRecord {
+
+  private MainframeDatasetFTPRecordReader ftpRecordReader;
+  private InputStream is;
+  private FTPClient ftp;
+  private final String DATASET_NAME = "dummy.ds";
+  private final String DATASET_TYPE = "g";
+  private static final Log LOG = LogFactory.getLog(
+      TestMainframeDatasetBinaryRecord.class.getName());
+
+  @Before
+  public void setUp() throws IOException, InterruptedException {
+    MainframeDatasetFTPRecordReader rdr = new MainframeDatasetFTPRecordReader();
+    Configuration conf;
+    MainframeDatasetInputSplit split;
+    TaskAttemptContext context;
+    ftpRecordReader = spy(rdr);
+    is = mock(InputStream.class);
+    ftp = mock(FTPClient.class);
+    split = mock(MainframeDatasetInputSplit.class);
+    context = mock(TaskAttemptContext.class);
+    conf = new Configuration();
+    when(ftp.retrieveFileStream(any(String.class))).thenReturn(is);
+    when(ftp.changeWorkingDirectory(any(String.class))).thenReturn(true);
+    doReturn("file1").when(ftpRecordReader).getNextDataset();
+    when(split.getNextDataset()).thenReturn(DATASET_NAME);
+    when(ftpRecordReader.getNextDataset()).thenReturn(DATASET_NAME);
+    conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,DATASET_NAME);
+    conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,DATASET_TYPE);
+    conf.setInt(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE,MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE);
+    ftpRecordReader.initialize(ftp, conf);
+  }
+
+  // Mock the inputstream.read method and manipulate the function parameters
+  protected Answer returnSqoopRecord(final int byteLength) {
+    return new Answer() {
+      public Object answer(InvocationOnMock invocation) {
+        return byteLength;
+      }
+    };
+  }
+
+  @Test
+  public void testGetNextBinaryRecordForFullRecord() {
+
+    MainframeDatasetBinaryRecord record = new MainframeDatasetBinaryRecord();
+    try {
+      when(is.read(any(byte[].class),anyInt(),anyInt()))
+        .thenAnswer(returnSqoopRecord(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE))
+        .thenReturn(-1);
+      when(ftp.completePendingCommand()).thenReturn(true);
+      Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record));
+      Assert.assertFalse(record.getFieldMap().values().isEmpty());
+      Assert.assertEquals(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE.intValue(),((byte[])record.getFieldMap().values().iterator().next()).length);
+    } catch (IOException ioe) {
+      LOG.error("Issue with reading 1 full binary buffer record", ioe);
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  @Test
+  public void testGetNextBinaryRecordForPartialRecord() {
+    int expectedBytesRead = 10;
+    MainframeDatasetBinaryRecord record = new MainframeDatasetBinaryRecord();
+    try {
+      when(is.read(any(byte[].class),anyInt(),anyInt()))
+        .thenAnswer(returnSqoopRecord(10))
+        .thenReturn(-1);
+      when(ftp.completePendingCommand()).thenReturn(true);
+      Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record));
+      Assert.assertFalse(record.getFieldMap().values().isEmpty());
+      Assert.assertEquals(expectedBytesRead,(((byte[])record.getFieldMap().values().iterator().next()).length));
+    } catch (IOException ioe) {
+      LOG.error("Issue with reading 10 byte binary record", ioe);
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  @Test
+  public void testGetNextBinaryRecordFor2Records() {
+    // test 1 full record, and 1 partial
+    int expectedBytesRead = 10;
+    MainframeDatasetBinaryRecord record = new MainframeDatasetBinaryRecord();
+    try {
+      when(is.read(any(byte[].class),anyInt(),anyInt()))
+        .thenAnswer(returnSqoopRecord(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE))
+        .thenAnswer(returnSqoopRecord(10))
+        .thenReturn(-1);
+      when(ftp.completePendingCommand()).thenReturn(true);
+      Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record));
+      Assert.assertFalse(record.getFieldMap().values().isEmpty());
+      Assert.assertTrue(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE.equals((((byte[])record.getFieldMap().values().iterator().next()).length)));
+      record = new MainframeDatasetBinaryRecord();
+      Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record));
+      Assert.assertFalse(record.getFieldMap().values().isEmpty());
+      Assert.assertEquals(expectedBytesRead,(((byte[])record.getFieldMap().values().iterator().next()).length));
+    } catch (IOException ioe) {
+      LOG.error("Issue with reading 1 full binary buffer record followed by 1 partial binary buffer record", ioe);
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  @Test
+  public void testGetNextBinaryRecordForMultipleReads() {
+    // test reading 1 record where the stream returns less than a full buffer
+    MainframeDatasetBinaryRecord record = new MainframeDatasetBinaryRecord();
+    try {
+      when(is.read(any(byte[].class),anyInt(),anyInt()))
+        .thenAnswer(returnSqoopRecord(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE /2))
+        .thenAnswer(returnSqoopRecord(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE /2))
+        .thenReturn(-1);
+      when(ftp.completePendingCommand()).thenReturn(true);
+      Assert.assertTrue(ftpRecordReader.getNextBinaryRecord(record));
+      Assert.assertFalse(record.getFieldMap().values().isEmpty());
+      Assert.assertEquals(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE.intValue(),((byte[])record.getFieldMap().values().iterator().next()).length);
+      record = new MainframeDatasetBinaryRecord();
+      Assert.assertFalse(ftpRecordReader.getNextBinaryRecord(record));
+      Assert.assertNull((((byte[])record.getFieldMap().values().iterator().next())));
+    } catch (IOException ioe) {
+      LOG.error("Issue with verifying reading partial buffer binary records", ioe);
+      throw new RuntimeException(ioe);
+    }
+  }
+}
index 0b0c6c3..c2edc53 100644 (file)
@@ -22,18 +22,18 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
 import org.apache.commons.cli.ParseException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.sqoop.cli.RelatedOptions;
 import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 import org.apache.sqoop.SqoopOptions;
 import org.apache.sqoop.SqoopOptions.InvalidOptionsException;
 import org.apache.sqoop.cli.ToolOptions;
 import org.apache.sqoop.testutil.BaseSqoopTestCase;
+import org.junit.rules.ExpectedException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -43,15 +43,16 @@ import static org.junit.Assert.assertTrue;
 
 public class TestMainframeImportTool extends BaseSqoopTestCase {
 
-  private static final Log LOG = LogFactory.getLog(TestMainframeImportTool.class
-      .getName());
-
   private MainframeImportTool mfImportTool;
+  private ToolOptions toolOptions;
+  private SqoopOptions sqoopOption;
 
   @Before
   public void setUp() {
 
     mfImportTool = new MainframeImportTool();
+    toolOptions = new ToolOptions();
+    sqoopOption = new SqoopOptions();
   }
 
   @After
@@ -183,4 +184,58 @@ public class TestMainframeImportTool extends BaseSqoopTestCase {
          Boolean isTape = sqoopOption.getMainframeInputDatasetTape();
          assert(isTape != null && isTape.toString().equals("false"));
   }
+
+  @Test
+  public void testFtpTransferModeAscii() throws ParseException, InvalidOptionsException {
+    String[] args = new String[] { "--dataset", "mydatasetname", "--as-textfile" };
+    configureAndValidateOptions(args);
+    assertEquals(SqoopOptions.FileLayout.TextFile,sqoopOption.getFileLayout());
+  }
+  @Test
+  public void testFtpTransferModeDefaultsToAscii() throws ParseException, InvalidOptionsException {
+    String[] args = new String[] { "--dataset", "mydatasetname" };
+    configureAndValidateOptions(args);
+    assertEquals(SqoopOptions.FileLayout.TextFile,sqoopOption.getFileLayout());
+  }
+
+  @Test
+  public void testAsBinaryFileSetsCorrectFileLayoutAndDefaultBufferSize() throws ParseException, InvalidOptionsException {
+    String[] args = new String[] { "--dataset", "mydatasetname", "--as-binaryfile" };
+    configureAndValidateOptions(args);
+    assertEquals(SqoopOptions.FileLayout.BinaryFile,sqoopOption.getFileLayout());
+    assertEquals(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE, sqoopOption.getBufferSize());
+  }
+
+  @Test
+  public void testSetBufferSize() throws ParseException, InvalidOptionsException {
+    final Integer EXPECTED_BUFFER = 1024;
+    String[] args = new String[] { "--dataset", "mydatasetname", "--as-binaryfile", "--buffersize", EXPECTED_BUFFER.toString() };
+    configureAndValidateOptions(args);
+    assertEquals(SqoopOptions.FileLayout.BinaryFile,sqoopOption.getFileLayout());
+    assertEquals(EXPECTED_BUFFER, sqoopOption.getBufferSize());
+  }
+
+  @Rule
+  public final ExpectedException exception = ExpectedException.none();
+
+  @Test
+  public void testBufferSizeWithoutBinaryThrowsException() throws ParseException, InvalidOptionsException {
+    final Integer EXPECTED_BUFFER = 1024;
+    String[] args = new String[] { "--buffersize", EXPECTED_BUFFER.toString() };
+    exception.expect(InvalidOptionsException.class);
+    configureAndValidateOptions(args);
+  }
+
+  @Test
+  public void testInvalidBufferSizeThrowsNumberFormatException() throws ParseException, InvalidOptionsException {
+    String[] args = new String[] { "--buffersize", "invalidinteger" };
+    exception.expect(NumberFormatException.class);
+    configureAndValidateOptions(args);
+  }
+
+  private void configureAndValidateOptions(String[] args) throws ParseException, InvalidOptionsException {
+    mfImportTool.configureOptions(toolOptions);
+    sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
+    mfImportTool.validateImportOptions(sqoopOption);
+  }
 }