SAMZA-1736: Add counters and timers for batch get/put/delete operations in KeyValueSt...
authorPrateek Maheshwari <pmaheshwari@linkedin.com>
Fri, 1 Jun 2018 20:17:37 +0000 (13:17 -0700)
committerPrateek Maheshwari <pmaheshwari@linkedin.com>
Fri, 1 Jun 2018 20:17:37 +0000 (13:17 -0700)
Author: Prateek Maheshwari <pmaheshwari@linkedin.com>

Reviewers: Cameron Lee <calee@linkedin.com>, Shanthoosh Venkatraman <svenkatr@linkedin.com>

Closes #539 from prateekm/store-metrics

samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStoreMetrics.scala

index 6f3794d..f25097c 100644 (file)
@@ -154,11 +154,12 @@ class RocksDbKeyValueStore(
   }
 
   def put(key: Array[Byte], value: Array[Byte]): Unit = ifOpen {
-    metrics.puts.inc
     require(key != null, "Null key not allowed.")
     if (value == null) {
+      metrics.deletes.inc
       db.delete(writeOptions, key)
     } else {
+      metrics.puts.inc
       metrics.bytesWritten.inc(key.length + value.length)
       db.put(writeOptions, key, value)
     }
@@ -166,16 +167,17 @@ class RocksDbKeyValueStore(
 
   // Write batch from RocksDB API is not used currently because of: https://github.com/facebook/rocksdb/issues/262
   def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]): Unit = ifOpen {
+    metrics.putAlls.inc()
     val iter = entries.iterator
     var wrote = 0
     var deletes = 0
     while (iter.hasNext) {
-      wrote += 1
       val curr = iter.next()
       if (curr.getValue == null) {
         deletes += 1
         db.delete(writeOptions, curr.getKey)
       } else {
+        wrote += 1
         val key = curr.getKey
         val value = curr.getValue
         metrics.bytesWritten.inc(key.length + value.length)
@@ -187,7 +189,6 @@ class RocksDbKeyValueStore(
   }
 
   def delete(key: Array[Byte]): Unit = ifOpen {
-    metrics.deletes.inc
     put(key, null)
   }
 
index 157c1bc..963dce4 100644 (file)
@@ -20,7 +20,7 @@
 package org.apache.samza.storage.kv
 
 import org.apache.samza.util.Logging
-import org.apache.samza.storage.{StoreProperties, StorageEngine}
+import org.apache.samza.storage.{StorageEngine, StoreProperties}
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.util.TimerUtil
 
@@ -52,8 +52,11 @@ class KeyValueStorageEngine[K, V](
   }
 
   override def getAll(keys: java.util.List[K]): java.util.Map[K, V] = {
-    metrics.gets.inc(keys.size)
-    wrapperStore.getAll(keys)
+    updateTimer(metrics.getAllNs) {
+      metrics.getAlls.inc()
+      metrics.gets.inc(keys.size)
+      wrapperStore.getAll(keys)
+    }
   }
 
   def put(key: K, value: V) = {
@@ -64,8 +67,7 @@ class KeyValueStorageEngine[K, V](
   }
 
   def putAll(entries: java.util.List[Entry[K, V]]) = {
-    metrics.puts.inc(entries.size)
-    wrapperStore.putAll(entries)
+    doPutAll(wrapperStore, entries)
   }
 
   def delete(key: K) = {
@@ -76,8 +78,11 @@ class KeyValueStorageEngine[K, V](
   }
 
   override def deleteAll(keys: java.util.List[K]) = {
-    metrics.deletes.inc(keys.size)
-    wrapperStore.deleteAll(keys)
+    updateTimer(metrics.deleteAllNs) {
+      metrics.deleteAlls.inc()
+      metrics.deletes.inc(keys.size)
+      wrapperStore.deleteAll(keys)
+    }
   }
 
   def range(from: K, to: K) = {
@@ -110,17 +115,17 @@ class KeyValueStorageEngine[K, V](
       batch.add(new Entry(keyBytes, valBytes))
 
       if (batch.size >= batchSize) {
-        rawStore.putAll(batch)
+        doPutAll(rawStore, batch)
         batch.clear()
       }
 
       if (valBytes != null) {
-        metrics.restoredBytes.inc(valBytes.size)
-        metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + valBytes.size)
+        metrics.restoredBytes.inc(valBytes.length)
+        metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + valBytes.length)
       }
 
-      metrics.restoredBytes.inc(keyBytes.size)
-      metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + keyBytes.size)
+      metrics.restoredBytes.inc(keyBytes.length)
+      metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + keyBytes.length)
 
       metrics.restoredMessages.inc()
       metrics.restoredMessagesGauge.set(metrics.restoredMessagesGauge.getValue + 1)
@@ -134,7 +139,7 @@ class KeyValueStorageEngine[K, V](
     info(count + " total entries restored.")
 
     if (batch.size > 0) {
-      rawStore.putAll(batch)
+      doPutAll(rawStore, batch)
     }
   }
 
@@ -159,6 +164,14 @@ class KeyValueStorageEngine[K, V](
     wrapperStore.close()
   }
 
+  private def doPutAll[Key, Value](store: KeyValueStore[Key, Value], entries: java.util.List[Entry[Key, Value]]) = {
+    updateTimer(metrics.putAllNs) {
+      metrics.putAlls.inc()
+      metrics.puts.inc(entries.size)
+      store.putAll(entries)
+    }
+  }
+
   override def getStoreProperties: StoreProperties = storeProperties
 
   override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
index a2c812e..92889ed 100644 (file)
@@ -28,27 +28,32 @@ class KeyValueStorageEngineMetrics(
   val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
 
   val gets = newCounter("gets")
-  val ranges = newCounter("ranges")
-  val alls = newCounter("alls")
+  val getAlls = newCounter("get-alls")
   val puts = newCounter("puts")
+  val putAlls = newCounter("put-alls")
   val deletes = newCounter("deletes")
+  val deleteAlls = newCounter("delete-alls")
   val flushes = newCounter("flushes")
+  val alls = newCounter("alls")
+  val ranges = newCounter("ranges")
   val snapshots = newCounter("snapshots")
 
-  val restoredMessages = newCounter("messages-restored") //Deprecated
-  val restoredMessagesGauge = newGauge("restored-messages", 0)
-
-  val restoredBytes = newCounter("messages-bytes") //Deprecated
-  val restoredBytesGauge = newGauge("restored-bytes", 0)
-
-
   val getNs = newTimer("get-ns")
+  val getAllNs = newTimer("get-all-ns")
   val putNs = newTimer("put-ns")
+  val putAllNs = newTimer("put-all-ns")
   val deleteNs = newTimer("delete-ns")
+  val deleteAllNs = newTimer("delete-all-ns")
   val flushNs = newTimer("flush-ns")
   val allNs = newTimer("all-ns")
   val rangeNs = newTimer("range-ns")
   val snapshotNs = newTimer("snapshot-ns")
 
+  val restoredMessages = newCounter("messages-restored") //Deprecated
+  val restoredMessagesGauge = newGauge("restored-messages", 0)
+
+  val restoredBytes = newCounter("messages-bytes") //Deprecated
+  val restoredBytesGauge = newGauge("restored-bytes", 0)
+
   override def getPrefix = storeName + "-"
 }
\ No newline at end of file
index 967d509..a73ad04 100644 (file)
@@ -27,11 +27,12 @@ class KeyValueStoreMetrics(
 
   val gets = newCounter("gets")
   val getAlls = newCounter("getAlls")
-  val ranges = newCounter("ranges")
-  val alls = newCounter("alls")
   val puts = newCounter("puts")
+  val putAlls = newCounter("putAlls")
   val deletes = newCounter("deletes")
   val deleteAlls = newCounter("deleteAlls")
+  val alls = newCounter("alls")
+  val ranges = newCounter("ranges")
   val flushes = newCounter("flushes")
   val bytesWritten = newCounter("bytes-written")
   val bytesRead = newCounter("bytes-read")