GIRAPH-1185
authorMaja Kabiljo <majakabiljo@fb.com>
Thu, 12 Apr 2018 15:59:13 +0000 (08:59 -0700)
committerMaja Kabiljo <majakabiljo@fb.com>
Thu, 12 Apr 2018 16:04:14 +0000 (09:04 -0700)
closes #69

giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContext.java
giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoSimpleWrapper.java [new file with mode: 0644]
giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoSimpleWritable.java [new file with mode: 0644]
giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoSimpleTests.java [new file with mode: 0644]

index 9618822..8b3c368 100644 (file)
@@ -92,12 +92,12 @@ public final class BlockWorkerContext extends WorkerContext
 
   @Override
   public void write(DataOutput out) throws IOException {
-    HadoopKryo.writeClassAndObject(out, workerLogic);
+    HadoopKryo.writeClassAndObj(out, workerLogic);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    workerLogic = HadoopKryo.readClassAndObject(in);
+    workerLogic = HadoopKryo.readClassAndObj(in);
     workerLogic.getOutputHandle().initialize(getConf(), getContext());
   }
 }
index 53c8adc..b110101 100644 (file)
@@ -26,11 +26,15 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 
+import com.esotericsoftware.kryo.util.DefaultClassResolver;
 import org.apache.giraph.conf.GiraphConfigurationSettable;
+import com.esotericsoftware.kryo.ClassResolver;
+import com.esotericsoftware.kryo.ReferenceResolver;
+import com.esotericsoftware.kryo.util.MapReferenceResolver;
 import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
 import org.apache.giraph.types.ops.collections.BasicSet;
-import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
 import org.apache.giraph.writable.kryo.markers.NonKryoWritable;
+import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
 import org.apache.giraph.writable.kryo.serializers.ArraysAsListSerializer;
 import org.apache.giraph.writable.kryo.serializers.CollectionsNCopiesSerializer;
 import org.apache.giraph.writable.kryo.serializers.DirectWritableSerializer;
@@ -69,6 +73,9 @@ import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
  * It extends Kryo to reuse KryoPool functionality, but have additional needed
  * objects cached as well. If we move to ThreadLocal or other caching
  * technique, we can use composition, instead of inheritance here.
+ *
+ * TODO: Refactor this class into two separate classes depending on
+ * whether the reference tracking is enabled or disabled.
  */
 public class HadoopKryo extends Kryo {
   /** Pool of reusable Kryo objects, since they are expensive to create */
@@ -76,9 +83,16 @@ public class HadoopKryo extends Kryo {
       new KryoFactory() {
         @Override
         public Kryo create() {
-          return createKryo();
+          return createKryo(true, true);
         }
       }).build();
+  /** Thread local HadoopKryo object */
+  private static final ThreadLocal<HadoopKryo> KRYO =
+    new ThreadLocal<HadoopKryo>() {
+      @Override protected HadoopKryo initialValue() {
+        return  createKryo(false, false);
+      }
+    };
 
   /**
    * List of interfaces/parent classes that will not be allowed to be
@@ -111,19 +125,15 @@ public class HadoopKryo extends Kryo {
         "Logger must be a static field");
   }
 
-  // Use chunked streams, so within same stream we can use both kryo and
-  // non-kryo serialization.
   /** Reusable Input object */
-  private final InputChunked input = new InputChunked(4096);
+  private InputChunked input;
   /** Reusable Output object */
-  private final OutputChunked output = new OutputChunked(4096);
+  private OutputChunked output;
 
   /** Reusable DataInput wrapper stream */
-  private final DataInputWrapperStream dataInputWrapperStream =
-      new DataInputWrapperStream();
+  private DataInputWrapperStream dataInputWrapperStream;
   /** Reusable DataOutput wrapper stream */
-  private final DataOutputWrapperStream dataOutputWrapperStream =
-      new DataOutputWrapperStream();
+  private DataOutputWrapperStream dataOutputWrapperStream;
 
   /**
    * Map of already initialized serializers used
@@ -136,17 +146,27 @@ public class HadoopKryo extends Kryo {
   private HadoopKryo() {
   }
 
+  /**
+   * Constructor that takes custom class resolver and reference resolver.
+   * @param classResolver Class resolver
+   * @param referenceResolver Reference resolver
+   */
+  private HadoopKryo(ClassResolver classResolver,
+                     ReferenceResolver referenceResolver) {
+    super(classResolver, referenceResolver);
+  }
+
   // Public API:
 
   /**
    * Write type of given object and the object itself to the output stream.
-   * Inverse of readClassAndObject.
+   * Inverse of readClassAndObj.
    *
    * @param out Output stream
    * @param object Object to write
    */
-  public static void writeClassAndObject(
-      final DataOutput out, final Object object) {
+  public static void writeClassAndObj(
+          final DataOutput out, final Object object) {
     writeInternal(out, object, false);
   }
 
@@ -159,7 +179,7 @@ public class HadoopKryo extends Kryo {
    * @return Deserialized object
    * @param <T> Type of the object being read
    */
-  public static <T> T readClassAndObject(DataInput in) {
+  public static <T> T readClassAndObj(DataInput in) {
     return readInternal(in, null, false);
   }
 
@@ -187,6 +207,62 @@ public class HadoopKryo extends Kryo {
   }
 
   /**
+   * Writes class and object to specified output stream with specified
+   * Kryo object. It does not use interim buffers.
+   * @param kryo Kryo object
+   * @param out Output stream
+   * @param object Object
+   */
+  public static void writeWithKryo(
+          final HadoopKryo kryo, final Output out,
+          final Object object) {
+    kryo.writeClassAndObject(out, object);
+    out.close();
+  }
+
+  /**
+   * Write out of object with given kryo
+   * @param kryo Kryo object
+   * @param out Output
+   * @param object Object to write
+   */
+  public static void writeWithKryoOutOfObject(
+          final HadoopKryo kryo, final Output out,
+          final Object object) {
+    kryo.writeOutOfObject(out, object);
+    out.close();
+  }
+
+  /**
+   * Reads class and object from specified input stream with
+   * specified kryo object.
+   * it does not use interim buffers.
+   * @param kryo Kryo object
+   * @param in Input buffer
+   * @param <T> Object type parameter
+   * @return Object
+   */
+  public static <T> T readWithKryo(
+          final HadoopKryo kryo, final Input in) {
+    T object;
+    object = (T) kryo.readClassAndObject(in);
+    in.close();
+    return object;
+  }
+
+  /**
+   * Read into object with given kryo.
+   * @param kryo Kryo object
+   * @param in Input
+   * @param object Object to read into
+   */
+  public static void readWithKryoIntoObject(
+          final HadoopKryo kryo, final Input in, Object object) {
+    kryo.readIntoObject(in, object);
+    in.close();
+  }
+
+  /**
    * Create copy of the object, by magically recursively copying
    * all of it's fields, keeping reference structures (like cycles)
    *
@@ -203,15 +279,39 @@ public class HadoopKryo extends Kryo {
     });
   }
 
+  /**
+   * Returns a kryo which doesn't track objects, hence
+   * serialization of recursive/nested objects is not
+   * supported.
+   *
+   * Reference tracking significantly degrades the performance
+   * since kryo has to store all serialized objects and search
+   * the history to check if an object has been already serialized.
+   *
+   * @return Hadoop kryo which doesn't track objects.
+   */
+  public static HadoopKryo getNontrackingKryo() {
+    return KRYO.get();
+  }
+
   // Private implementation:
 
   /**
    * Create new instance of HadoopKryo, properly initialized.
-   *
-   * @return New HadoopKryo instnace
+   * @param trackReferences if true, object references are tracked.
+   * @param hasBuffer if true, an interim buffer is used.
+   * @return new HadoopKryo instance
    */
-  private static HadoopKryo createKryo() {
-    HadoopKryo kryo = new HadoopKryo();
+  private static HadoopKryo createKryo(boolean trackReferences,
+                                       boolean hasBuffer) {
+    HadoopKryo kryo;
+    if (trackReferences) {
+      kryo = new HadoopKryo();
+    } else {
+      // TODO: if trackReferences is false use custom class resolver.
+      kryo = new HadoopKryo(new DefaultClassResolver(),
+              new MapReferenceResolver());
+    }
 
     String version = System.getProperty("java.version");
     char minor = version.charAt(2);
@@ -296,6 +396,19 @@ public class HadoopKryo extends Kryo {
     kryo.addDefaultSerializer(Writable.class, customSerializerFactory);
     kryo.setDefaultSerializer(customSerializerFactory);
 
+    if (hasBuffer) {
+      kryo.input = new InputChunked(4096);
+      kryo.output = new OutputChunked(4096);
+      kryo.dataInputWrapperStream = new DataInputWrapperStream();
+      kryo.dataOutputWrapperStream = new DataOutputWrapperStream();
+    }
+
+    if (!trackReferences) {
+      kryo.setReferences(false);
+
+      // TODO: Enable the following when a custom class resolver is created.
+      // kryo.setAutoReset(false);
+    }
     return kryo;
   }
 
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoSimpleWrapper.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoSimpleWrapper.java
new file mode 100644 (file)
index 0000000..9c5de74
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Generic wrapper object, making any object writable.
+ *
+ * Usage of this class is similar to KryoWritableWrapper but
+ * unlike KryoWritableWrapper, this class does not
+ * support recursive/nested objects to provide better
+ * performance.
+ *
+ * If the underlying stream is a kryo output stream than the read/write
+ * happens with a kryo object that doesn't track references, providing
+ * significantly better performance.
+ *
+ * @param <T> Object type
+ */
+public class KryoSimpleWrapper<T> implements Writable {
+
+  /** Wrapped object */
+  private T object;
+
+  /**
+   * Create wrapper given an object.
+   * @param object Object instance
+   */
+  public KryoSimpleWrapper(T object) {
+    this.object = object;
+  }
+
+  /**
+   * Creates wrapper initialized with null.
+   */
+  public KryoSimpleWrapper() {
+  }
+
+  /**
+   * Unwrap the object value
+   * @return Object value
+   */
+  public T get() {
+    return object;
+  }
+
+  /**
+   * Set wrapped object value
+   * @param object New object value
+   */
+  public void set(T object) {
+    this.object = object;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws java.io.IOException {
+    if (in instanceof Input) {
+      Input inp = (Input) in;
+      object = HadoopKryo.readWithKryo(HadoopKryo.getNontrackingKryo(), inp);
+    } else {
+      object = HadoopKryo.readClassAndObj(in);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (out instanceof Output) {
+      Output outp = (Output) out;
+      HadoopKryo.writeWithKryo(HadoopKryo.getNontrackingKryo(), outp, object);
+    } else {
+      HadoopKryo.writeClassAndObj(out, object);
+    }
+  }
+}
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoSimpleWritable.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoSimpleWritable.java
new file mode 100644 (file)
index 0000000..1a79555
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
+
+/**
+ * Class which you can extend to get all serialization/deserialization
+ * done automagically.
+ *
+ * Usage of this class is similar to KryoWritable but
+ * unlike KryoWritable, this class does not
+ * support recursive/nested objects to provide better
+ * performance.
+ *
+ * If the underlying stream is a kryo output stream than the read/write
+ * happens with a kryo object that doesn't track references, providing
+ * significantly better performance.
+ *
+ */
+public abstract class KryoSimpleWritable implements KryoIgnoreWritable {
+  @Override
+  public final void write(DataOutput out) throws IOException {
+    if (out instanceof Output) {
+      Output outp = (Output) out;
+      HadoopKryo.writeWithKryoOutOfObject(
+              HadoopKryo.getNontrackingKryo(), outp, this);
+    } else {
+      HadoopKryo.writeOutOfObject(out, this);
+    }
+  }
+
+  @Override
+  public final void readFields(DataInput in) throws IOException {
+    if (in instanceof Input) {
+      Input inp = (Input) in;
+      HadoopKryo.readWithKryoIntoObject(
+              HadoopKryo.getNontrackingKryo(), inp, this);
+    } else {
+      HadoopKryo.readIntoObject(in, this);
+    }
+  }
+}
index d80a9a7..5da0973 100644 (file)
@@ -71,12 +71,12 @@ public class KryoWritableWrapper<T> implements Writable {
 
   @Override
   public void readFields(DataInput in) throws java.io.IOException {
-    object = HadoopKryo.readClassAndObject(in);
+    object = HadoopKryo.readClassAndObj(in);
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
-    HadoopKryo.writeClassAndObject(out, object);
+    HadoopKryo.writeClassAndObj(out, object);
   }
 
   /**
diff --git a/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoSimpleTests.java b/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoSimpleTests.java
new file mode 100644 (file)
index 0000000..3b09964
--- /dev/null
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.Random;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.ArrayList;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.giraph.utils.WritableUtils;
+import org.junit.Test;
+
+/**
+ * Tests some subtle cases of kryo serialization.
+ */
+public class KryoSimpleTests {
+   interface BasicProperties {
+     boolean getBoolProp();
+     void setBoolProp(boolean val);
+     byte getByteProp();
+     void setByteProp(byte val);
+     short getShortProp();
+     void setShortProp(short val);
+     char getCharProp();
+     void setCharProp(char val);
+     int getIntProp();
+     void setIntProp(int val);
+     long getLongProp();
+     void setLongProp(long val);
+     float getFloatProp();
+     void setFloatProp(float val);
+     double getDoubleProp() ;
+     void setDoubleProp(double val);
+     String getStrProp() ;
+     void setStrProp(String strProp);
+     List getListProp();
+     void setListProp(List val);
+     Set getSetProp();
+     void setSetProp(Set val);
+     Map getMapProp() ;
+     void setMapProp(Map val);
+  }
+  public static class TestClass implements BasicProperties{
+    boolean boolProp;
+    byte byteProp;
+    short shortProp;
+    char charProp;
+    int intProp;
+    long longProp;
+    float floatProp;
+    double doubleProp;
+    String strProp;
+    List listProp;
+    Set setProp;
+    Map mapProp;
+
+    public TestClass() {
+    }
+
+    public boolean getBoolProp() {
+      return boolProp;
+    }
+    public void setBoolProp(boolean val) {
+      boolProp = val;
+    }
+    public byte getByteProp() {
+      return byteProp;
+    }
+    public void setByteProp(byte val) {
+      byteProp = val;
+    }
+    public short getShortProp() {
+      return shortProp;
+    }
+    public void setShortProp(short val) {
+      shortProp = val;
+    }
+    public char getCharProp() {
+      return charProp;
+    }
+    public void setCharProp(char val) {
+      charProp = val;
+    }
+    public int getIntProp() {
+      return intProp;
+    }
+    public void setIntProp(int val) {
+      intProp = val;
+    }
+    public long getLongProp() {
+      return longProp;
+    }
+    public void setLongProp(long val) {
+      longProp = val;
+    }
+    public float getFloatProp() {
+      return floatProp;
+    }
+    public void setFloatProp(float val) {
+      floatProp = val;
+    }
+    public double getDoubleProp() {
+      return doubleProp;
+    }
+    public void setDoubleProp(double val) {
+      doubleProp = val;
+    }
+    public String getStrProp() {
+      return strProp;
+    }
+    public void setStrProp(String strProp) {
+      this.strProp = strProp;
+    }
+    public List getListProp() {
+      return this.listProp;
+    }
+    public void setListProp(List val) {
+      this.listProp = val;
+    }
+    public Set getSetProp() {
+      return setProp;
+    }
+    public void setSetProp(Set val) {
+      this.setProp = val;
+    }
+    public Map getMapProp() {
+      return mapProp;
+    }
+    public void setMapProp(Map val) {
+      this.mapProp = val;
+    }
+
+  }
+
+  public static class TestClassFromWritable extends KryoSimpleWritable implements BasicProperties {
+    boolean boolProp;
+    byte byteProp;
+    short shortProp;
+    char charProp;
+    int intProp;
+    long longProp;
+    float floatProp;
+    double doubleProp;
+    String strProp;
+    List listProp;
+    Set setProp;
+    Map mapProp;
+
+    public TestClassFromWritable() {
+    }
+    public boolean getBoolProp() {
+      return boolProp;
+    }
+    public void setBoolProp(boolean val) {
+      boolProp = val;
+    }
+    public byte getByteProp() {
+      return byteProp;
+    }
+    public void setByteProp(byte val) {
+      byteProp = val;
+    }
+    public short getShortProp() {
+      return shortProp;
+    }
+    public void setShortProp(short val) {
+      shortProp = val;
+    }
+    public char getCharProp() {
+      return charProp;
+    }
+    public void setCharProp(char val) {
+      charProp = val;
+    }
+    public int getIntProp() {
+      return intProp;
+    }
+    public void setIntProp(int val) {
+      intProp = val;
+    }
+    public long getLongProp() {
+      return longProp;
+    }
+    public void setLongProp(long val) {
+      longProp = val;
+    }
+    public float getFloatProp() {
+      return floatProp;
+    }
+    public void setFloatProp(float val) {
+      floatProp = val;
+    }
+    public double getDoubleProp() {
+      return doubleProp;
+    }
+    public void setDoubleProp(double val) {
+      doubleProp = val;
+    }
+    public String getStrProp() {
+      return strProp;
+    }
+    public void setStrProp(String strProp) {
+      this.strProp = strProp;
+    }
+    public List getListProp() {
+      return this.listProp;
+    }
+    public void setListProp(List val) {
+      this.listProp = val;
+    }
+    public Set getSetProp() {
+      return setProp;
+    }
+    public void setSetProp(Set val) {
+      this.setProp = val;
+    }
+    public Map getMapProp() {
+      return mapProp;
+    }
+    public void setMapProp(Map val) {
+      this.mapProp = val;
+    }
+
+  }
+
+  public static class TestClassFromKryoWritable extends KryoWritable implements BasicProperties {
+    boolean boolProp;
+    byte byteProp;
+    short shortProp;
+    char charProp;
+    int intProp;
+    long longProp;
+    float floatProp;
+    double doubleProp;
+    String strProp;
+    List listProp;
+    Set setProp;
+    Map mapProp;
+
+    public TestClassFromKryoWritable() {
+    }
+    public boolean getBoolProp() {
+      return boolProp;
+    }
+    public void setBoolProp(boolean val) {
+      boolProp = val;
+    }
+    public byte getByteProp() {
+      return byteProp;
+    }
+    public void setByteProp(byte val) {
+      byteProp = val;
+    }
+    public short getShortProp() {
+      return shortProp;
+    }
+    public void setShortProp(short val) {
+      shortProp = val;
+    }
+    public char getCharProp() {
+      return charProp;
+    }
+    public void setCharProp(char val) {
+      charProp = val;
+    }
+    public int getIntProp() {
+      return intProp;
+    }
+    public void setIntProp(int val) {
+      intProp = val;
+    }
+    public long getLongProp() {
+      return longProp;
+    }
+    public void setLongProp(long val) {
+      longProp = val;
+    }
+    public float getFloatProp() {
+      return floatProp;
+    }
+    public void setFloatProp(float val) {
+      floatProp = val;
+    }
+    public double getDoubleProp() {
+      return doubleProp;
+    }
+    public void setDoubleProp(double val) {
+      doubleProp = val;
+    }
+    public String getStrProp() {
+      return strProp;
+    }
+    public void setStrProp(String strProp) {
+      this.strProp = strProp;
+    }
+    public List getListProp() {
+      return this.listProp;
+    }
+    public void setListProp(List val) {
+      this.listProp = val;
+    }
+    public Set getSetProp() {
+      return setProp;
+    }
+    public void setSetProp(Set val) {
+      this.setProp = val;
+    }
+    public Map getMapProp() {
+      return mapProp;
+    }
+    public void setMapProp(Map val) {
+      this.mapProp = val;
+    }
+  }
+
+  static int collectionSize = 100000;
+  private static BasicProperties populateTestClass(BasicProperties testClass) {
+    Random rand = new Random();
+    BasicProperties reference = new TestClass();
+     reference.setBoolProp(rand.nextBoolean());
+     byte [] byteArr = new byte[1];
+    rand.nextBytes(byteArr);
+    reference.setByteProp(byteArr[0]);
+    reference.setCharProp((char)rand.nextInt((int)Character.MAX_VALUE));
+    reference.setIntProp(rand.nextInt(Integer.MAX_VALUE));
+    reference.setShortProp((short)rand.nextInt((int)Short.MAX_VALUE));
+    reference.setLongProp(rand.nextLong());
+    reference.setFloatProp(rand.nextFloat());
+    reference.setDoubleProp(rand.nextDouble());
+    reference.setStrProp(RandomStringUtils.randomAlphabetic(10));
+
+    List<Integer> list = new ArrayList<>();
+    for (int i=0; i < collectionSize ; i++) {
+      list.add(rand.nextInt(Integer.MAX_VALUE));
+    }
+    reference.setListProp(list);
+
+    Set<Integer> set = new HashSet<>();
+    for (int i=0; i < collectionSize; i++) {
+      set.add(rand.nextInt(Integer.MAX_VALUE));
+    }
+    reference.setSetProp(set);
+
+    Map<Integer, Integer> map = new HashMap<>();
+    for (int i=0; i < collectionSize; i++) {
+      map.put(i, rand.nextInt(Integer.MAX_VALUE));
+    }
+    reference.setMapProp(map);
+    popuateTestFrom(reference, testClass);
+    return reference;
+  }
+
+  private static void popuateTestFrom(BasicProperties from, BasicProperties to) {
+     to.setStrProp(from.getStrProp());
+     to.setDoubleProp(from.getDoubleProp());
+     to.setFloatProp(from.getFloatProp());
+     to.setLongProp(from.getLongProp());
+     to.setIntProp(from.getIntProp());
+     to.setCharProp(from.getCharProp());
+     to.setByteProp(from.getByteProp());
+     to.setBoolProp(from.getBoolProp());
+     to.setShortProp(from.getShortProp());
+     to.setMapProp(from.getMapProp());
+     to.setSetProp(from.getSetProp());
+     to.setListProp(from.getListProp());
+  }
+
+  private static void verifyTestClass(BasicProperties testClass, BasicProperties referenceClass) {
+     assertEquals(testClass.getBoolProp(), referenceClass.getBoolProp());
+     assertEquals(testClass.getByteProp(), referenceClass.getByteProp());
+     assertEquals(testClass.getCharProp(), referenceClass.getCharProp());
+     assertEquals(testClass.getDoubleProp(), referenceClass.getDoubleProp(), 0.0001);
+     assertEquals(testClass.getFloatProp(), referenceClass.getFloatProp(), 0.0001);
+     assertEquals(testClass.getIntProp(), referenceClass.getIntProp());
+     assertEquals(testClass.getLongProp(), referenceClass.getLongProp());
+     assertEquals(testClass.getShortProp(), referenceClass.getShortProp());
+     assertEquals(testClass.getStrProp(), referenceClass.getStrProp());
+     assertTrue(testClass.getListProp().equals(referenceClass.getListProp()));
+     assertTrue(testClass.getSetProp().equals(referenceClass.getSetProp()));
+     assertTrue(testClass.getMapProp().equals(referenceClass.getMapProp()));
+  }
+
+  @Test
+  public void testClassFromWritable() {
+    TestClassFromWritable writableClass = new TestClassFromWritable();
+    TestClassFromWritable res = new TestClassFromWritable();
+    BasicProperties reference = populateTestClass(writableClass);
+    WritableUtils.copyInto(
+            writableClass, res, true);
+    verifyTestClass(res,reference);
+  }
+
+  @Test
+  public void testKryoSimpleWrapper() {
+    TestClass testClass = new TestClass();
+    BasicProperties reference = populateTestClass(testClass);
+    TestClass res = WritableUtils.createCopy(new KryoSimpleWrapper<>(testClass)).get();
+    verifyTestClass(res, reference);
+  }
+}