SAMZA-1370; Memory leak in CachedStore when using ByteBufferSerde as key Serde
authorPrateek Maheshwari <pmaheshw@linkedin.com>
Thu, 27 Jul 2017 22:10:43 +0000 (15:10 -0700)
committerJagadish <jagadish@apache.org>
Thu, 27 Jul 2017 22:10:43 +0000 (15:10 -0700)
ByteBufferSerde uses relative bulk get to serialize the provided ByteBuffer which changes its internal position. ByteBuffer's `equals` and `hashCode` depend upon its remaining elements, i.e. on its position. This means that when using ByteBuffers as keys in the CachedStore, flushing cache contents to the underlying store changes their hashCode. Since the hashCode for the key no longer matches the one used when inserting it into the map, the LinkedHashMap cannot correctly evict or remove these entries, leading to a memory leak.

Changing ByteBufferSerde to duplicate the provided ByteBuffer before copying should fix this issue. Prefer this over using absolute gets since there's no bulk absolute get API for ByteBuffer.

Author: Prateek Maheshwari <pmaheshw@linkedin.com>

Reviewers: Jagadish <jagadish@apache.org>

Closes #251 from prateekm/bytebufferserde

docs/learn/documentation/versioned/jobs/configuration-table.html
samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala
samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala

index 3263856..bdf477a 100644 (file)
                         <dl>
                             <dt><code>org.apache.samza.serializers.ByteSerdeFactory</code></dt>
                             <dd>A no-op serde which passes through the undecoded byte array.</dd>
+                            <dt><code>org.apache.samza.serializers.ByteBufferSerdeFactory</code></dt>
+                            <dd>Encodes <code>java.nio.ByteBuffer</code> objects.</dd>
                             <dt><code>org.apache.samza.serializers.IntegerSerdeFactory</code></dt>
                             <dd>Encodes <code>java.lang.Integer</code> objects as binary (4 bytes fixed-length big-endian encoding).</dd>
                             <dt><code>org.apache.samza.serializers.StringSerdeFactory</code></dt>
                             <dt><code>org.apache.samza.serializers.LongSerdeFactory</code></dt>
                             <dd>Encodes <code>java.lang.Long</code> as binary (8 bytes fixed-length big-endian encoding).</dd>
                             <dt><code>org.apache.samza.serializers.DoubleSerdeFactory</code></dt>
-                            <dd>Encodes <code>java.lang.Double</code> as binray (8 bytes double-precision float point).</dd>
+                            <dd>Encodes <code>java.lang.Double</code> as binary (8 bytes double-precision float point).</dd>
+                            <dt><code>org.apache.samza.serializers.UUIDSerdeFactory</code></dt>
+                            <dd>Encodes <code>java.util.UUID</code> objects.</dd>
+                            <dt><code>org.apache.samza.serializers.SerializableSerdeFactory</code></dt>
+                            <dd>Encodes <code>java.io.Serializable</code> objects.</dd>
                             <dt><code>org.apache.samza.serializers.MetricsSnapshotSerdeFactory</code></dt>
                             <dd>Encodes <code>org.apache.samza.metrics.reporter.MetricsSnapshot</code> objects (which are
                                 used for <a href="../container/metrics.html">reporting metrics</a>) as JSON.</dd>
index 05c3e38..adb8781 100644 (file)
@@ -23,8 +23,7 @@ import org.apache.samza.config.Config
 import java.nio.ByteBuffer
 
 /**
- * A serializer for bytes that is effectively a no-op but can be useful for
- * binary messages.
+ * A serializer for ByteBuffers.
  */
 class ByteBufferSerdeFactory extends SerdeFactory[ByteBuffer] {
   def getSerde(name: String, config: Config): Serde[ByteBuffer] = new ByteBufferSerde
@@ -34,7 +33,7 @@ class ByteBufferSerde extends Serde[ByteBuffer] {
   def toBytes(byteBuffer: ByteBuffer) = {
     if (byteBuffer != null) {
       val bytes = new Array[Byte](byteBuffer.remaining())
-      byteBuffer.get(bytes)
+      byteBuffer.duplicate().get(bytes)
       bytes
     } else {
       null
index 9401d70..eddfb0a 100644 (file)
@@ -26,16 +26,28 @@ import java.nio.ByteBuffer
 
 class TestByteBufferSerde {
   @Test
-  def test {
+  def testSerde {
     val serde = new ByteBufferSerde
     assertNull(serde.toBytes(null))
     assertNull(serde.fromBytes(null))
 
     val bytes = "A lazy way of creating a byte array".getBytes()
-    val testBytes = ByteBuffer.wrap(bytes)
-    testBytes.mark()
-    assertArrayEquals(serde.toBytes(testBytes), bytes)
-    testBytes.reset()
-    assertEquals(serde.fromBytes(bytes), testBytes)
+    val byteBuffer = ByteBuffer.wrap(bytes)
+    byteBuffer.mark()
+    assertArrayEquals(serde.toBytes(byteBuffer), bytes)
+    byteBuffer.reset()
+    assertEquals(serde.fromBytes(bytes), byteBuffer)
+  }
+
+  @Test
+  def testSerializationPreservesInput {
+    val serde = new ByteBufferSerde
+    val bytes = "A lazy way of creating a byte array".getBytes()
+    val byteBuffer = ByteBuffer.wrap(bytes)
+    byteBuffer.get() // advance position by 1
+    serde.toBytes(byteBuffer)
+
+    assertEquals(byteBuffer.capacity(), byteBuffer.limit())
+    assertEquals(1, byteBuffer.position())
   }
 }
\ No newline at end of file