GIRAPH-1181
authorMaja Kabiljo <majakabiljo@fb.com>
Wed, 4 Apr 2018 22:22:24 +0000 (15:22 -0700)
committerMaja Kabiljo <majakabiljo@fb.com>
Wed, 4 Apr 2018 22:23:49 +0000 (15:23 -0700)
closes #65

12 files changed:
giraph-core/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java
giraph-core/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java
giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReusableByteArrayInput.java
giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java

index e4f782e..20ce426 100644 (file)
@@ -19,8 +19,8 @@
 package org.apache.giraph.comm.aggregators;
 
 import java.io.IOException;
-import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
 import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
 
 /**
  * Wrapper for output stream which keeps the place in the beginning for the
@@ -36,7 +36,7 @@ public abstract class CountingOutputStream {
    * Default constructor
    */
   public CountingOutputStream() {
-    dataOutput = new ExtendedByteArrayDataOutput();
+    dataOutput = new UnsafeByteArrayOutputStream();
     reset();
   }
 
index 7107228..d372bf2 100644 (file)
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.comm.requests;
 
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
+
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
@@ -64,6 +66,14 @@ public abstract class ByteArrayRequest extends WritableRequest {
     return new DataInputStream(new ByteArrayInputStream(data));
   }
 
+  /**
+   * Wraps the byte array with UnsafeByteArrayInputStream stream.
+   * @return UnsafeByteArrayInputStream
+   */
+  public UnsafeByteArrayInputStream getUnsafeByteArrayInput() {
+    return new UnsafeByteArrayInputStream(data);
+  }
+
   @Override
   void readFieldsRequest(DataInput input) throws IOException {
     int dataLength = input.readInt();
index 8f168a2..de54188 100644 (file)
 
 package org.apache.giraph.comm.requests;
 
-import java.io.DataInput;
 import java.io.IOException;
 
 import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
 import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
 import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
 import org.apache.giraph.utils.WritableUtils;
@@ -59,7 +59,7 @@ public class SendAggregatorsToOwnerRequest
     UnsafeByteArrayOutputStream reusedOut = new UnsafeByteArrayOutputStream();
     UnsafeReusableByteArrayInput reusedIn = new UnsafeReusableByteArrayInput();
 
-    DataInput input = getDataInput();
+    UnsafeByteArrayInputStream input = getUnsafeByteArrayInput();
     AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
     try {
       int num = input.readInt();
index 361bdc9..ee7ac72 100644 (file)
 
 package org.apache.giraph.comm.requests;
 
-import java.io.DataInput;
 import java.io.IOException;
 
 import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
@@ -53,7 +53,7 @@ public class SendAggregatorsToWorkerRequest extends
 
   @Override
   public void doRequest(ServerData serverData) {
-    DataInput input = getDataInput();
+    UnsafeByteArrayInputStream input = getUnsafeByteArrayInput();
     AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
     try {
       int num = input.readInt();
index 3a1bd64..0ea737f 100644 (file)
@@ -47,7 +47,8 @@ public class SendReducedToMasterRequest extends ByteArrayRequest
   @Override
   public void doRequest(MasterGlobalCommHandler commHandler) {
     try {
-      commHandler.getAggregatorHandler().acceptReducedValues(getDataInput());
+      commHandler.getAggregatorHandler().
+              acceptReducedValues(getUnsafeByteArrayInput());
     } catch (IOException e) {
       throw new IllegalStateException("doRequest: " +
           "IOException occurred while processing request", e);
index 2f76e6e..7164cb2 100644 (file)
 
 package org.apache.giraph.comm.requests;
 
-import java.io.DataInput;
 import java.io.IOException;
 
 import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 
@@ -52,7 +52,7 @@ public class SendWorkerAggregatorsRequest extends
 
   @Override
   public void doRequest(ServerData serverData) {
-    DataInput input = getDataInput();
+    UnsafeByteArrayInputStream input = getUnsafeByteArrayInput();
     OwnerAggregatorServerData aggregatorData =
         serverData.getOwnerAggregatorData();
     try {
index c5587e1..2b91502 100644 (file)
@@ -54,9 +54,6 @@ public class UnsafeArrayReads extends UnsafeReads {
   private static final long BYTE_ARRAY_OFFSET  =
       UNSAFE.arrayBaseOffset(byte[].class);
 
-  /** Byte buffer */
-  protected byte[] buf;
-
   /**
    * Constructor
    *
@@ -64,7 +61,7 @@ public class UnsafeArrayReads extends UnsafeReads {
    */
   public UnsafeArrayReads(byte[] buf) {
     super(buf.length);
-    this.buf = buf;
+    this.buffer = buf;
   }
 
   /**
@@ -76,12 +73,12 @@ public class UnsafeArrayReads extends UnsafeReads {
    */
   public UnsafeArrayReads(byte[] buf, int offset, int length) {
     super(offset, length);
-    this.buf = buf;
+    this.buffer = buf;
   }
 
   @Override
   public int available() {
-    return (int) (bufLength - pos);
+    return (int) (limit - position);
   }
 
   @Override
@@ -92,38 +89,38 @@ public class UnsafeArrayReads extends UnsafeReads {
 
   @Override
   public int getPos() {
-    return (int) pos;
+    return (int) position;
   }
 
   @Override
   public void readFully(byte[] b) throws IOException {
-    ensureRemaining(b.length);
-    System.arraycopy(buf, (int) pos, b, 0, b.length);
-    pos += b.length;
+    require(b.length);
+    System.arraycopy(buffer, (int) position, b, 0, b.length);
+    position += b.length;
   }
 
   @Override
   public void readFully(byte[] b, int off, int len) throws IOException {
-    ensureRemaining(len);
-    System.arraycopy(buf, (int) pos, b, off, len);
-    pos += len;
+    require(len);
+    System.arraycopy(buffer, (int) position, b, off, len);
+    position += len;
   }
 
   @Override
-  public boolean readBoolean() throws IOException {
-    ensureRemaining(SIZE_OF_BOOLEAN);
-    boolean value = UNSAFE.getBoolean(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += SIZE_OF_BOOLEAN;
+  public boolean readBoolean() {
+    require(SIZE_OF_BOOLEAN);
+    boolean value = UNSAFE.getBoolean(buffer,
+        BYTE_ARRAY_OFFSET + position);
+    position += SIZE_OF_BOOLEAN;
     return value;
   }
 
   @Override
-  public byte readByte() throws IOException {
-    ensureRemaining(SIZE_OF_BYTE);
-    byte value = UNSAFE.getByte(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += SIZE_OF_BYTE;
+  public byte readByte() {
+    require(SIZE_OF_BYTE);
+    byte value = UNSAFE.getByte(buffer,
+        BYTE_ARRAY_OFFSET + position);
+    position += SIZE_OF_BYTE;
     return value;
   }
 
@@ -133,11 +130,11 @@ public class UnsafeArrayReads extends UnsafeReads {
   }
 
   @Override
-  public short readShort() throws IOException {
-    ensureRemaining(SIZE_OF_SHORT);
-    short value = UNSAFE.getShort(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += SIZE_OF_SHORT;
+  public short readShort() {
+    require(SIZE_OF_SHORT);
+    short value = UNSAFE.getShort(buffer,
+        BYTE_ARRAY_OFFSET + position);
+    position += SIZE_OF_SHORT;
     return value;
   }
 
@@ -147,47 +144,47 @@ public class UnsafeArrayReads extends UnsafeReads {
   }
 
   @Override
-  public char readChar() throws IOException {
-    ensureRemaining(SIZE_OF_CHAR);
-    char value = UNSAFE.getChar(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += SIZE_OF_CHAR;
+  public char readChar() {
+    require(SIZE_OF_CHAR);
+    char value = UNSAFE.getChar(buffer,
+        BYTE_ARRAY_OFFSET + position);
+    position += SIZE_OF_CHAR;
     return value;
   }
 
   @Override
-  public int readInt() throws IOException {
-    ensureRemaining(SIZE_OF_INT);
-    int value = UNSAFE.getInt(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += SIZE_OF_INT;
+  public int readInt() {
+    require(SIZE_OF_INT);
+    int value = UNSAFE.getInt(buffer,
+        BYTE_ARRAY_OFFSET + position);
+    position += SIZE_OF_INT;
     return value;
   }
 
   @Override
-  public long readLong() throws IOException {
-    ensureRemaining(SIZE_OF_LONG);
-    long value = UNSAFE.getLong(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += SIZE_OF_LONG;
+  public long readLong() {
+    require(SIZE_OF_LONG);
+    long value = UNSAFE.getLong(buffer,
+        BYTE_ARRAY_OFFSET + position);
+    position += SIZE_OF_LONG;
     return value;
   }
 
   @Override
-  public float readFloat() throws IOException {
-    ensureRemaining(SIZE_OF_FLOAT);
-    float value = UNSAFE.getFloat(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += SIZE_OF_FLOAT;
+  public float readFloat() {
+    require(SIZE_OF_FLOAT);
+    float value = UNSAFE.getFloat(buffer,
+        BYTE_ARRAY_OFFSET + position);
+    position += SIZE_OF_FLOAT;
     return value;
   }
 
   @Override
-  public double readDouble() throws IOException {
-    ensureRemaining(SIZE_OF_DOUBLE);
-    double value = UNSAFE.getDouble(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += SIZE_OF_DOUBLE;
+  public double readDouble() {
+    require(SIZE_OF_DOUBLE);
+    double value = UNSAFE.getDouble(buffer,
+        BYTE_ARRAY_OFFSET + position);
+    position += SIZE_OF_DOUBLE;
     return value;
   }
 
index c8a8cac..b9b1995 100644 (file)
@@ -20,6 +20,13 @@ package org.apache.giraph.utils;
 
 /**
  * UnsafeByteArrayInputStream
+ *
+ * This stream now extends com.esotericsoftware.kryo.io.Input so that kryo
+ * serialization can directly read from this stream without using an
+ * additional buffer, providing a faster serialization.
+
+ * Users of this class has to explicitly close the stream to avoid style check
+ * errors even though close is no-op when the underlying stream is not set.
  */
 public class UnsafeByteArrayInputStream extends UnsafeArrayReads {
 
index 27f1156..13bc7d2 100644 (file)
@@ -17,8 +17,9 @@
  */
 package org.apache.giraph.utils;
 
+import com.esotericsoftware.kryo.io.Output;
+
 import java.io.IOException;
-import java.io.OutputStream;
 import java.io.UTFDataFormatException;
 import java.lang.reflect.Field;
 import java.util.Arrays;
@@ -34,9 +35,16 @@ import static org.apache.giraph.utils.ByteUtils.SIZE_OF_DOUBLE;
 
 /**
  * Byte array output stream that uses Unsafe methods to serialize/deserialize
- * much faster
+ * much faster.
+ *
+ * This stream now extends com.esotericsoftware.kryo.io.Output so that kryo
+ * serialization can directly write to this stream without using an
+ * additional buffer, providing a faster serialization.
+ *
+ * Users of this class has to explicitly close the stream to avoid style check
+ * errors even though close is no-op when the underlying stream is not set.
  */
-public class UnsafeByteArrayOutputStream extends OutputStream
+public class UnsafeByteArrayOutputStream extends Output
   implements ExtendedDataOutput {
   static {
     try {
@@ -61,11 +69,6 @@ public class UnsafeByteArrayOutputStream extends OutputStream
   private static final long BYTE_ARRAY_OFFSET  =
       UNSAFE.arrayBaseOffset(byte[].class);
 
-  /** Byte buffer */
-  private byte[] buf;
-  /** Position in the buffer */
-  private int pos = 0;
-
   /**
    * Constructor
    */
@@ -79,7 +82,8 @@ public class UnsafeByteArrayOutputStream extends OutputStream
    * @param size Initial size of the underlying byte array
    */
   public UnsafeByteArrayOutputStream(int size) {
-    buf = new byte[size];
+    buffer = new byte[size];
+    capacity = size;
   }
 
   /**
@@ -89,10 +93,11 @@ public class UnsafeByteArrayOutputStream extends OutputStream
    */
   public UnsafeByteArrayOutputStream(byte[] buf) {
     if (buf == null) {
-      this.buf = new byte[DEFAULT_BYTES];
+      this.buffer = new byte[DEFAULT_BYTES];
     } else {
-      this.buf = buf;
+      this.buffer = buf;
     }
+    capacity = this.buffer.length;
   }
 
   /**
@@ -103,7 +108,7 @@ public class UnsafeByteArrayOutputStream extends OutputStream
    */
   public UnsafeByteArrayOutputStream(byte[] buf, int pos) {
     this(buf);
-    this.pos = pos;
+    this.position = pos;
   }
 
   /**
@@ -112,148 +117,161 @@ public class UnsafeByteArrayOutputStream extends OutputStream
    *
    * @param size Size to add
    */
-  private void ensureSize(int size) {
-    if (pos + size > buf.length) {
-      byte[] newBuf = new byte[(buf.length + size) << 1];
-      System.arraycopy(buf, 0, newBuf, 0, pos);
-      buf = newBuf;
+  @Override
+  protected boolean require(int size) {
+    if (position + size > buffer.length) {
+      byte[] newBuf = new byte[(buffer.length + size) << 1];
+      System.arraycopy(buffer, 0, newBuf, 0, position);
+      buffer = newBuf;
+      capacity = buffer.length;
+      return true;
     }
+    return false;
   }
 
   @Override
   public byte[] getByteArray() {
-    return buf;
+    return buffer;
   }
 
   @Override
   public byte[] toByteArray() {
-    return Arrays.copyOf(buf, pos);
+    return Arrays.copyOf(buffer, position);
   }
 
   @Override
   public byte[] toByteArray(int offset, int length) {
-    if (offset + length > pos) {
+    if (offset + length > position) {
       throw new IndexOutOfBoundsException(String.format("Offset: %d + " +
-          "Length: %d exceeds the size of buf : %d", offset, length, pos));
+        "Length: %d exceeds the size of buffer : %d",
+            offset, length, position));
     }
-    return Arrays.copyOfRange(buf, offset, length);
+    return Arrays.copyOfRange(buffer, offset, length);
   }
 
   @Override
   public void reset() {
-    pos = 0;
+    position = 0;
   }
 
   @Override
   public int getPos() {
-    return pos;
+    return position;
   }
 
   @Override
-  public void write(int b) throws IOException {
-    ensureSize(SIZE_OF_BYTE);
-    buf[pos] = (byte) b;
-    pos += SIZE_OF_BYTE;
+  public void write(int b) {
+    require(SIZE_OF_BYTE);
+    buffer[position] = (byte) b;
+    position += SIZE_OF_BYTE;
   }
 
   @Override
-  public void write(byte[] b) throws IOException {
-    ensureSize(b.length);
-    System.arraycopy(b, 0, buf, pos, b.length);
-    pos += b.length;
+  public void write(byte[] b) {
+    require(b.length);
+    System.arraycopy(b, 0, buffer, position, b.length);
+    position += b.length;
   }
 
   @Override
-  public void write(byte[] b, int off, int len) throws IOException {
-    ensureSize(len);
-    System.arraycopy(b, off, buf, pos, len);
-    pos += len;
+  public void write(byte[] b, int off, int len) {
+    require(len);
+    System.arraycopy(b, off, buffer, position, len);
+    position += len;
   }
 
   @Override
-  public void writeBoolean(boolean v) throws IOException {
-    ensureSize(SIZE_OF_BOOLEAN);
-    UNSAFE.putBoolean(buf, BYTE_ARRAY_OFFSET + pos, v);
-    pos += SIZE_OF_BOOLEAN;
+  public void writeBoolean(boolean v) {
+    require(SIZE_OF_BOOLEAN);
+    UNSAFE.putBoolean(buffer, BYTE_ARRAY_OFFSET + position, v);
+    position += SIZE_OF_BOOLEAN;
   }
 
   @Override
-  public void writeByte(int v) throws IOException {
-    ensureSize(SIZE_OF_BYTE);
-    UNSAFE.putByte(buf, BYTE_ARRAY_OFFSET + pos, (byte) v);
-    pos += SIZE_OF_BYTE;
+  public void writeByte(int v) {
+    require(SIZE_OF_BYTE);
+    UNSAFE.putByte(buffer, BYTE_ARRAY_OFFSET + position, (byte) v);
+    position += SIZE_OF_BYTE;
   }
 
   @Override
-  public void writeShort(int v) throws IOException {
-    ensureSize(SIZE_OF_SHORT);
-    UNSAFE.putShort(buf, BYTE_ARRAY_OFFSET + pos, (short) v);
-    pos += SIZE_OF_SHORT;
+  public void writeShort(int v) {
+    require(SIZE_OF_SHORT);
+    UNSAFE.putShort(buffer, BYTE_ARRAY_OFFSET + position, (short) v);
+    position += SIZE_OF_SHORT;
   }
 
   @Override
   public void writeChar(int v) throws IOException {
-    ensureSize(SIZE_OF_CHAR);
-    UNSAFE.putChar(buf, BYTE_ARRAY_OFFSET + pos, (char) v);
-    pos += SIZE_OF_CHAR;
+    require(SIZE_OF_CHAR);
+    UNSAFE.putChar(buffer, BYTE_ARRAY_OFFSET + position, (char) v);
+    position += SIZE_OF_CHAR;
+  }
+
+  @Override
+  public void writeChar(char v) {
+    require(SIZE_OF_CHAR);
+    UNSAFE.putChar(buffer, BYTE_ARRAY_OFFSET + position, v);
+    position += SIZE_OF_CHAR;
   }
 
   @Override
-  public void writeInt(int v) throws IOException {
-    ensureSize(SIZE_OF_INT);
-    UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, v);
-    pos += SIZE_OF_INT;
+  public void writeInt(int v) {
+    require(SIZE_OF_INT);
+    UNSAFE.putInt(buffer, BYTE_ARRAY_OFFSET + position, v);
+    position += SIZE_OF_INT;
   }
 
   @Override
   public void ensureWritable(int minSize) {
-    if ((pos + minSize) > buf.length) {
-      buf = Arrays.copyOf(buf, Math.max(buf.length << 1, pos + minSize));
+    if ((position + minSize) > buffer.length) {
+      buffer = Arrays.copyOf(buffer,
+                Math.max(buffer.length << 1, position + minSize));
     }
   }
 
   @Override
   public void skipBytes(int bytesToSkip) {
     ensureWritable(bytesToSkip);
-    pos += bytesToSkip;
+    position += bytesToSkip;
   }
 
   @Override
   public void writeInt(int pos, int value) {
-    if (pos + SIZE_OF_INT > this.pos) {
+    if (pos + SIZE_OF_INT > this.position) {
       throw new IndexOutOfBoundsException(
           "writeInt: Tried to write int to position " + pos +
-              " but current length is " + this.pos);
+              " but current length is " + this.position);
     }
-    UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, value);
+    UNSAFE.putInt(buffer, BYTE_ARRAY_OFFSET + pos, value);
   }
 
   @Override
-  public void writeLong(long v) throws IOException {
-    ensureSize(SIZE_OF_LONG);
-    UNSAFE.putLong(buf, BYTE_ARRAY_OFFSET + pos, v);
-    pos += SIZE_OF_LONG;
+  public void writeLong(long v) {
+    require(SIZE_OF_LONG);
+    UNSAFE.putLong(buffer, BYTE_ARRAY_OFFSET + position, v);
+    position += SIZE_OF_LONG;
   }
 
   @Override
-  public void writeFloat(float v) throws IOException {
-    ensureSize(SIZE_OF_FLOAT);
-    UNSAFE.putFloat(buf, BYTE_ARRAY_OFFSET + pos, v);
-    pos += SIZE_OF_FLOAT;
+  public void writeFloat(float v) {
+    require(SIZE_OF_FLOAT);
+    UNSAFE.putFloat(buffer, BYTE_ARRAY_OFFSET + position, v);
+    position += SIZE_OF_FLOAT;
   }
 
   @Override
-  public void writeDouble(double v) throws IOException {
-    ensureSize(SIZE_OF_DOUBLE);
-    UNSAFE.putDouble(buf, BYTE_ARRAY_OFFSET + pos, v);
-    pos += SIZE_OF_DOUBLE;
+  public void writeDouble(double v) {
+    require(SIZE_OF_DOUBLE);
+    UNSAFE.putDouble(buffer, BYTE_ARRAY_OFFSET + position, v);
+    position += SIZE_OF_DOUBLE;
   }
 
   @Override
   public void writeBytes(String s) throws IOException {
     // Note that this code is mostly copied from DataOutputStream
     int len = s.length();
-    ensureSize(len);
+    require(len);
     for (int i = 0; i < len; i++) {
       int v = s.charAt(i);
       writeByte(v);
@@ -264,7 +282,7 @@ public class UnsafeByteArrayOutputStream extends OutputStream
   public void writeChars(String s) throws IOException {
     // Note that this code is mostly copied from DataOutputStream
     int len = s.length();
-    ensureSize(len * SIZE_OF_CHAR);
+    require(len * SIZE_OF_CHAR);
     for (int i = 0; i < len; i++) {
       int v = s.charAt(i);
       writeChar(v);
@@ -295,7 +313,7 @@ public class UnsafeByteArrayOutputStream extends OutputStream
           "encoded string too long: " + utflen + " bytes");
     }
 
-    ensureSize(utflen + SIZE_OF_SHORT);
+    require(utflen + SIZE_OF_SHORT);
     writeShort(utflen);
 
     int i = 0;
@@ -304,21 +322,21 @@ public class UnsafeByteArrayOutputStream extends OutputStream
       if (!((c >= 0x0001) && (c <= 0x007F))) {
         break;
       }
-      buf[pos++] = (byte) c;
+      buffer[position++] = (byte) c;
     }
 
     for (; i < strlen; i++) {
       c = s.charAt(i);
       if ((c >= 0x0001) && (c <= 0x007F)) {
-        buf[pos++] = (byte) c;
+        buffer[position++] = (byte) c;
 
       } else if (c > 0x07FF) {
-        buf[pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
-        buf[pos++] = (byte) (0x80 | ((c >>  6) & 0x3F));
-        buf[pos++] = (byte) (0x80 | ((c >>  0) & 0x3F));
+        buffer[position++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+        buffer[position++] = (byte) (0x80 | ((c >>  6) & 0x3F));
+        buffer[position++] = (byte) (0x80 | ((c >>  0) & 0x3F));
       } else {
-        buf[pos++] = (byte) (0xC0 | ((c >>  6) & 0x1F));
-        buf[pos++] = (byte) (0x80 | ((c >>  0) & 0x3F));
+        buffer[position++] = (byte) (0xC0 | ((c >>  6) & 0x1F));
+        buffer[position++] = (byte) (0x80 | ((c >>  0) & 0x3F));
       }
     }
   }
index 39ab352..4053ca6 100644 (file)
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.utils;
 
+import com.esotericsoftware.kryo.io.Input;
+
 import java.io.IOException;
 import java.io.UTFDataFormatException;
 
@@ -25,12 +27,7 @@ import java.io.UTFDataFormatException;
  * Byte array input stream that uses Unsafe methods to deserialize
  * much faster
  */
-public abstract class UnsafeReads implements ExtendedDataInput {
-
-  /** Buffer length */
-  protected int bufLength;
-  /** Position in the buffer */
-  protected long pos = 0;
+public abstract class UnsafeReads extends Input implements ExtendedDataInput {
 
   /**
    * Constructor
@@ -38,7 +35,7 @@ public abstract class UnsafeReads implements ExtendedDataInput {
    * @param length buf length
    */
   public UnsafeReads(int length) {
-    bufLength = length;
+    limit = length;
   }
 
   /**
@@ -48,8 +45,8 @@ public abstract class UnsafeReads implements ExtendedDataInput {
    * @param length buf length
    */
   public UnsafeReads(long offset, int length) {
-    pos = offset;
-    bufLength = length;
+    position = (int) offset;
+    limit = length;
   }
 
   /**
@@ -72,17 +69,19 @@ public abstract class UnsafeReads implements ExtendedDataInput {
    * @param requiredBytes Bytes required to read
    * @throws IOException When there are not enough bytes to read
    */
-  protected void ensureRemaining(int requiredBytes) throws IOException {
+  @Override
+  protected int require(int requiredBytes) {
     if (available() < requiredBytes) {
-      throw new IOException("ensureRemaining: Only " + available() +
-          " bytes remaining, trying to read " + requiredBytes);
+      throw new IndexOutOfBoundsException("require: Only " +
+          available() + " bytes remaining, trying to read " + requiredBytes);
     }
+    return available();
   }
 
   @Override
-  public int skipBytes(int n) throws IOException {
-    ensureRemaining(n);
-    pos += n;
+  public int skipBytes(int n) {
+    require(n);
+    position += n;
     return n;
   }
 
@@ -105,7 +104,7 @@ public abstract class UnsafeReads implements ExtendedDataInput {
       case '\r':
         int c2 = readByte();
         if ((c2 != '\n') && (c2 != -1)) {
-          pos -= 1;
+          position -= 1;
         }
         break loop;
       default:
index a75815a..679119f 100644 (file)
@@ -39,8 +39,8 @@ public class UnsafeReusableByteArrayInput extends UnsafeArrayReads {
    * @param length length of the valid data
    */
   public void initialize(byte[] buf, int offset, int length) {
-    this.buf = buf;
-    this.pos = offset;
-    this.bufLength = length;
+    this.buffer = buf;
+    this.position = offset;
+    this.limit = length;
   }
 }
index c8251b1..d0a68f3 100644 (file)
@@ -18,9 +18,9 @@
 
 package org.apache.giraph.utils.io;
 
-import org.apache.giraph.utils.ExtendedByteArrayDataInput;
 import org.apache.giraph.utils.ExtendedDataInput;
 import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -36,7 +36,7 @@ import java.util.List;
 public class BigDataInput implements ExtendedDataInput {
   /** Empty data input */
   private static final ExtendedDataInput EMPTY_INPUT =
-      new ExtendedByteArrayDataInput(new byte[0]);
+      new UnsafeByteArrayInputStream(new byte[0]);
 
   /** Input which we are currently reading from */
   private ExtendedDataInput currentInput;