SAMZA-957: Avoid unnecessary KV Store flushes (part 3)
authorJacob Maes <jacob.maes@gmail.com>
Tue, 7 Jun 2016 21:39:13 +0000 (14:39 -0700)
committerYi Pan (Data Infrastructure) <nickpan47@gmail.com>
Tue, 7 Jun 2016 21:39:13 +0000 (14:39 -0700)
samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
samza-kv/src/test/scala/org/apache/samza/storage/kv/TestCachedStore.scala

index c28f8db..ae6717d 100644 (file)
@@ -63,18 +63,12 @@ class CachedStore[K, V](
   private val cache = new java.util.LinkedHashMap[K, CacheEntry[K, V]]((cacheSize * 1.2).toInt, 1.0f, true) {
     override def removeEldestEntry(eldest: java.util.Map.Entry[K, CacheEntry[K, V]]): Boolean = {
       val evict = super.size > cacheSize
-      // We need backwards compatibility with the previous broken flushing behavior for array keys.
-      if (evict || hasArrayKeys) {
+      if (evict) {
         val entry = eldest.getValue
         // if this entry hasn't been written out yet, flush it and all other dirty keys
         if (entry.dirty != null) {
-          if (hasArrayKeys) {
-            debug("Found a dirty entry and cache has array keys. Flushing.")
-            flush()
-          } else {
-            debug("Found a dirty entry. Calling putAll() on all dirty entries.")
-            putAllDirtyEntries()
-          }
+          debug("Found a dirty entry. Calling putAll() on all dirty entries.")
+          putAllDirtyEntries()
         }
       }
       evict
@@ -128,14 +122,14 @@ class CachedStore[K, V](
 
   override def range(from: K, to: K): KeyValueIterator[K, V] = {
     metrics.ranges.inc
-    flush()
+    putAllDirtyEntries()
 
     new CachedStoreIterator(store.range(from, to))
   }
 
   override def all(): KeyValueIterator[K, V] = {
     metrics.alls.inc
-    flush()
+    putAllDirtyEntries()
 
     new CachedStoreIterator(store.all())
   }
@@ -173,9 +167,19 @@ class CachedStore[K, V](
     }
 
     // putAll() dirty values if the write list is full.
-    if (dirtyCount >= writeBatchSize) {
+    val purgeNeeded = if (dirtyCount >= writeBatchSize) {
       debug("Dirty count %s >= write batch size %s. Calling putAll() on all dirty entries." format (dirtyCount, writeBatchSize))
+      true
+    } else if (hasArrayKeys) {
+      // Flush every time to support the following legacy behavior:
+      // If array keys are used with a cached store, get() will always miss the cache because of array equality semantics
+      // However, it will fall back to the underlying store which does support arrays.
+      true
+    } else {
+      false
+    }
 
+    if (purgeNeeded) {
       putAllDirtyEntries()
     }
   }
@@ -232,7 +236,7 @@ class CachedStore[K, V](
   private def checkKeyIsArray(key: K) {
     if (!containsArrayKeys && key.isInstanceOf[Array[_]]) {
       // Warn the first time that we see an array key.
-      warn("Using arrays as keys results in unpredictable behavior since cache is implemented with a map. Consider using ByteBuffer, or a different key type.")
+      warn("Using arrays as keys results in unpredictable behavior since cache is implemented with a map. Consider using ByteBuffer, or a different key type, or turn off the cache altogether.")
       containsArrayKeys = true
     }
   }
index eee7447..96eb5fa 100644 (file)
 package org.apache.samza.storage.kv
 
 import java.util
+import java.util.Arrays
 
-import org.junit.Test
 import org.junit.Assert._
+import org.junit.Test
 import org.mockito.ArgumentCaptor
-import org.mockito.Mockito._
 import org.mockito.Matchers.anyObject
+import org.mockito.Mockito._
 
-import java.util.Arrays
 import scala.collection.JavaConverters._
 
 class TestCachedStore {
@@ -38,12 +38,60 @@ class TestCachedStore {
 
     assertFalse(store.hasArrayKeys)
     store.put("test1-key".getBytes("UTF-8"), "test1-value".getBytes("UTF-8"))
-    // Ensure we preserve old, broken flushing behavior for array keys
-    verify(kv).flush();
     assertTrue(store.hasArrayKeys)
   }
 
   @Test
+  def testLRUCacheEviction() {
+    val kv = spy(new MockKeyValueStore())
+    val store = new CachedStore[String, String](kv, 2, 2)
+    assertFalse("KV store should be empty", kv.all().hasNext)
+
+    // Below eviction threshold
+    store.put("test1-key", "test1-value")
+    assertFalse("Entries should not have been purged yet", kv.all().hasNext)
+
+    // Batch limit reached
+    store.put("test2-key", "test2-value")
+    assertTrue("Entries should be purged as soon as there are batchSize dirty entries", kv.all().hasNext)
+
+    // kv.putAll() should have been called, verified below.
+
+    // All dirty values should have been added to the underlying store
+    // KV store should have both items
+    val kvItr = kv.all();
+    assertNotNull(kvItr.next())
+    assertNotNull(kvItr.next())
+    assertFalse(kvItr.hasNext)
+
+    // Above eviction threshold but eldest entries are not dirty
+    store.put("test3-key", "test3-value")
+
+    // KV store should not have the 3rd item. We only purge if the batch size is exceeded or if the eldest(expiring) entry is dirty.
+    val kvItr2 = kv.all();
+    assertNotNull(kvItr2.next())
+    assertNotNull(kvItr2.next())
+    assertFalse(kvItr2.hasNext)
+
+    // Force the dirty key to be the eldest by reading a different key
+    store.get("test2-key")
+
+    // Add one more. We should not purge all items again. Only when dirty items exceed the threshold.
+    store.put("test4-key", "test4-value")
+
+    // The eldest item should have been purged along with the just-added item, so the KV store should have all 4 items.
+    val kvItr3 = kv.all();
+    assertNotNull(kvItr3.next())
+    assertNotNull(kvItr3.next())
+    assertNotNull(kvItr3.next())
+    assertNotNull(kvItr3.next())
+    assertFalse(kvItr3.hasNext)
+
+    // There should have been 2 purges; one for exceeding the batch size, and one for expiring a dirty cache entry.
+    verify(kv, times(2)).putAll(anyObject());
+  }
+
+  @Test
   def testIterator() {
     val kv = new MockKeyValueStore()
     val store = new CachedStore[String, String](kv, 100, 100)