TEZ-3942. RPC getTask writable optimization invalid in hadoop 2.8+
authorNishant Dash <nishant.dash@oath.com>
Tue, 24 Jul 2018 15:14:53 +0000 (10:14 -0500)
committerJason Lowe <jlowe@apache.org>
Tue, 24 Jul 2018 15:44:02 +0000 (10:44 -0500)
Signed-off-by: Jason Lowe <jlowe@apache.org>
tez-api/src/main/java/org/apache/tez/common/TezUtils.java
tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java
tez-api/src/main/java/org/apache/tez/dag/api/UserPayload.java
tez-api/src/test/java/org/apache/tez/dag/api/TestEntityDescriptor.java

index aed9e0f..072c02f 100644 (file)
@@ -20,6 +20,7 @@ package org.apache.tez.common;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -97,7 +98,7 @@ public class TezUtils {
    * @throws java.io.IOException
    */
   public static UserPayload createUserPayloadFromConf(Configuration conf) throws IOException {
-    return UserPayload.create(createByteStringFromConf(conf).asReadOnlyByteBuffer());
+    return UserPayload.create(ByteBuffer.wrap(createByteStringFromConf(conf).toByteArray()));
   }
 
   /**
index c5d9c0b..acc5f12 100644 (file)
@@ -735,7 +735,7 @@ public class DagTypeConverters {
     if (payload == null) {
       return null;
     }
-    return payload.getPayload();
+    return payload.getRawPayload();
   }
 
   public static VertexExecutionContextProto convertToProto(
index dcc4ebf..13d4a93 100644 (file)
@@ -23,6 +23,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -94,36 +95,40 @@ public abstract class EntityDescriptor<T extends EntityDescriptor<T>> implements
     return this.className;
   }
 
+  void writeSingular(DataOutput out, ByteBuffer bb) throws IOException {
+    out.write(bb.array(), 0, bb.array().length);
+  }
+
+  void writeSegmented(DataOutput out, ByteBuffer bb) throws IOException {
+    // This code is just for fallback in case serialization is changed to
+    // use something other than DataOutputBuffer.
+    int len;
+    byte[] buf = new byte[SERIALIZE_BUFFER_SIZE];
+    do {
+      len = Math.min(bb.remaining(), SERIALIZE_BUFFER_SIZE);
+      bb.get(buf, 0, len);
+      out.write(buf, 0, len);
+    } while (bb.remaining() > 0);
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     Text.writeString(out, className);
     // TODO: TEZ-305 - using protobuf serde instead of Writable serde.
     ByteBuffer bb = DagTypeConverters.convertFromTezUserPayload(userPayload);
-    if (bb == null) {
+    if (bb == null || bb.remaining() == 0) {
       out.writeInt(-1);
+      return;
+    }
+
+    // write size
+    out.writeInt(bb.remaining());
+    if (bb.hasArray()) {
+      writeSingular(out, bb);
     } else {
-      int size = bb.remaining();
-      if (size == 0) {
-        out.writeInt(-1);
-      } else {
-        out.writeInt(size);
-        if (out instanceof DataOutputBuffer) {
-          DataOutputBuffer buf = (DataOutputBuffer) out;
-          buf.write(new ByteBufferDataInput(bb), size);
-        } else {
-          // This code is just for fallback in case serialization is changed to
-          // use something other than DataOutputBuffer.
-          int len;
-          byte[] buf = new byte[SERIALIZE_BUFFER_SIZE];
-          do {
-            len = Math.min(bb.remaining(), SERIALIZE_BUFFER_SIZE);
-            bb.get(buf, 0, len);
-            out.write(buf, 0, len);
-          } while (bb.remaining() > 0);
-        }
-      }
-      out.writeInt(userPayload.getVersion());
+      writeSegmented(out, bb);
     }
+    out.writeInt(userPayload.getVersion());
   }
 
   @Override
@@ -144,76 +149,4 @@ public abstract class EntityDescriptor<T extends EntityDescriptor<T>> implements
         userPayload == null ? false : userPayload.getPayload() == null ? false : true;
     return "ClassName=" + className + ", hasPayload=" + hasPayload;
   }
-
-  private static class ByteBufferDataInput implements DataInput {
-
-    private final ByteBuffer bb;
-
-    public ByteBufferDataInput(ByteBuffer bb) {
-      this.bb = bb;
-    }
-
-    @Override
-    public void readFully(byte[] b) throws IOException {
-      bb.get(b, 0, bb.remaining());
-    }
-
-    @Override
-    public void readFully(byte[] b, int off, int len) throws IOException {
-      bb.get(b, off, len);
-    }
-
-    @Override
-    public int skipBytes(int n) throws IOException {
-      throw new UnsupportedOperationException();
-    }
-    @Override
-    public boolean readBoolean() throws IOException {
-      throw new UnsupportedOperationException();
-    }
-    @Override
-    public byte readByte() throws IOException {
-      return bb.get();
-    }
-    @Override
-    public int readUnsignedByte() throws IOException {
-      throw new UnsupportedOperationException();
-    }
-    @Override
-    public short readShort() throws IOException {
-      throw new UnsupportedOperationException();
-    }
-    @Override
-    public int readUnsignedShort() throws IOException {
-      throw new UnsupportedOperationException();
-    }
-    @Override
-    public char readChar() throws IOException {
-      throw new UnsupportedOperationException();
-    }
-    @Override
-    public int readInt() throws IOException {
-      throw new UnsupportedOperationException();
-    }
-    @Override
-    public long readLong() throws IOException {
-      throw new UnsupportedOperationException();
-    }
-    @Override
-    public float readFloat() throws IOException {
-      throw new UnsupportedOperationException();
-    }
-    @Override
-    public double readDouble() throws IOException {
-      throw new UnsupportedOperationException();
-    }
-    @Override
-    public String readLine() throws IOException {
-      throw new UnsupportedOperationException();
-    }
-    @Override
-    public String readUTF() throws IOException {
-      throw new UnsupportedOperationException();
-    }
-  }
 }
index fa617b5..087b17a 100644 (file)
@@ -63,6 +63,17 @@ public final class UserPayload {
   }
 
   /**
+   * Return the payload as a ByteBuffer.
+   * @return ByteBuffer.
+   */
+  @Nullable
+  public ByteBuffer getRawPayload() {
+    // Note: Several bits of serialization, including deepCopyAsArray depend on a new instance of the
+    // ByteBuffer being returned, since they modify it. If changing this code to return the same
+    // ByteBuffer - deepCopyAsArray and TezEntityDescriptor need to be looked at.
+    return payload == EMPTY_BYTE ? null : payload.duplicate();
+  }
+  /**
    * Return the payload as a read-only ByteBuffer.
    * @return read-only ByteBuffer.
    */
index 1e8a99d..606bf42 100644 (file)
@@ -23,41 +23,80 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.tez.common.TezUtils;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.spy;
 
 public class TestEntityDescriptor {
 
-  @Test
+  public void verifyResults(InputDescriptor entityDescriptor, InputDescriptor deserialized, UserPayload payload,
+                             String confVal) throws IOException {
+    Assert.assertEquals(entityDescriptor.getClassName(), deserialized.getClassName());
+    // History text is not serialized when sending to tasks
+    Assert.assertNull(deserialized.getHistoryText());
+    Assert.assertArrayEquals(payload.deepCopyAsArray(), deserialized.getUserPayload().deepCopyAsArray());
+    Configuration deserializedConf = TezUtils.createConfFromUserPayload(deserialized.getUserPayload());
+    Assert.assertEquals(confVal, deserializedConf.get("testKey"));
+  }
+
+  public void testSingularWrite(InputDescriptor entityDescriptor, InputDescriptor deserialized, UserPayload payload,
+                                String confVal) throws IOException {
+    DataOutputBuffer out = new DataOutputBuffer();
+    entityDescriptor.write(out);
+    out.close();
+    ByteArrayOutputStream bos = new ByteArrayOutputStream(out.getData().length);
+    bos.write(out.getData());
+
+    Mockito.verify(entityDescriptor).writeSingular(eq(out), any(ByteBuffer.class));
+    deserialized.readFields(new DataInputStream(new ByteArrayInputStream(bos.toByteArray())));
+    verifyResults(entityDescriptor, deserialized, payload, confVal);
+  }
+
+  public void testSegmentedWrite(InputDescriptor entityDescriptor, InputDescriptor deserialized, UserPayload payload,
+                                 String confVal) throws IOException {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(bos);
+    entityDescriptor.write(out);
+    out.close();
+
+    Mockito.verify(entityDescriptor).writeSegmented(eq(out), any(ByteBuffer.class));
+    deserialized.readFields(new DataInputStream(new ByteArrayInputStream(bos.toByteArray())));
+    verifyResults(entityDescriptor, deserialized, payload, confVal);
+  }
+
+  @Test (timeout=1000)
   public void testEntityDescriptorHadoopSerialization() throws IOException {
-    // This tests the alternate serialization code path
-    // if the DataOutput is not DataOutputBuffer
+     /* This tests the alternate serialization code path
+     * if the DataOutput is not DataOutputBuffer
+     * AND, if it indeed is, with a read/write payload */
     Configuration conf = new Configuration(true);
     String confVal = RandomStringUtils.random(10000, true, true);
     conf.set("testKey", confVal);
     UserPayload payload = TezUtils.createUserPayloadFromConf(conf);
+
+    InputDescriptor deserialized = InputDescriptor.create("dummy");
     InputDescriptor entityDescriptor =
         InputDescriptor.create("inputClazz").setUserPayload(payload)
-        .setHistoryText("Bar123");
+                .setHistoryText("Bar123");
+    InputDescriptor entityDescriptorLivingInFear = spy(entityDescriptor);
 
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    DataOutputStream out = new DataOutputStream(bos);
-    entityDescriptor.write(out);
-    out.close();
+    testSingularWrite(entityDescriptorLivingInFear, deserialized, payload, confVal);
 
-    InputDescriptor deserialized = InputDescriptor.create("dummy");
-    deserialized.readFields(new DataInputStream(new ByteArrayInputStream(bos.toByteArray())));
-
-    Assert.assertEquals(entityDescriptor.getClassName(), deserialized.getClassName());
-    // History text is not serialized when sending to tasks
-    Assert.assertNull(deserialized.getHistoryText());
-    Assert.assertArrayEquals(payload.deepCopyAsArray(), deserialized.getUserPayload().deepCopyAsArray());
-    Configuration deserializedConf = TezUtils.createConfFromUserPayload(deserialized.getUserPayload());
-    Assert.assertEquals(confVal, deserializedConf.get("testKey"));
+    /* make read-only payload */
+    payload =  UserPayload.create(payload.getPayload());
+    entityDescriptor = InputDescriptor.create("inputClazz").setUserPayload(payload)
+                      .setHistoryText("Bar123");
+    entityDescriptorLivingInFear = spy(entityDescriptor);
+    testSegmentedWrite(entityDescriptorLivingInFear, deserialized, payload, confVal);
   }
-
 }