SAMZA-1751: Refactored metrics for table API
authorWei Song <wsong@wsong-mn2.linkedin.biz>
Fri, 15 Jun 2018 23:27:45 +0000 (16:27 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Fri, 15 Jun 2018 23:27:45 +0000 (16:27 -0700)
Refactored metrics for table API
 - Added TableMetricsUtil that encapsulates required parameters, maintains naming consistency and simplifies metrics creation API for tables.
 - Added metrics to local table
 - Maintained consistency between local, remote and caching table

Author: Wei Song <wsong@wsong-mn2.linkedin.biz>

Reviewers: Peng Du<pdu@linkedin.com>

Closes #555 from weisong44/table-metrics

12 files changed:
samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java [new file with mode: 0644]
samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java

index 989828c..23e4f7f 100644 (file)
@@ -26,10 +26,12 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 
 import org.apache.samza.container.SamzaContainerContext;
-import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.utils.DefaultTableReadMetrics;
+import org.apache.samza.table.utils.DefaultTableWriteMetrics;
+import org.apache.samza.table.utils.TableMetricsUtil;
 import org.apache.samza.task.TaskContext;
 
 import com.google.common.base.Preconditions;
@@ -73,6 +75,10 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
   // Use stripe based locking to allow parallelism of disjoint keys.
   private final Striped<Lock> stripedLocks;
 
+  // Metrics
+  private DefaultTableReadMetrics readMetrics;
+  private DefaultTableWriteMetrics writeMetrics;
+
   // Common caching stats
   private AtomicLong hitCount = new AtomicLong();
   private AtomicLong missCount = new AtomicLong();
@@ -91,14 +97,18 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
    */
   @Override
   public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
-    MetricsRegistry metricsRegistry = taskContext.getMetricsRegistry();
-    metricsRegistry.newGauge(GROUP_NAME, new SupplierGauge(tableId + "-hit-rate", () -> hitRate()));
-    metricsRegistry.newGauge(GROUP_NAME, new SupplierGauge(tableId + "-miss-rate", () -> missRate()));
-    metricsRegistry.newGauge(GROUP_NAME, new SupplierGauge(tableId + "-req-count", () -> requestCount()));
+    readMetrics = new DefaultTableReadMetrics(containerContext, taskContext, this, tableId);
+    writeMetrics = new DefaultTableWriteMetrics(containerContext, taskContext, this, tableId);
+    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId);
+    tableMetricsUtil.newGauge("hit-rate", () -> hitRate());
+    tableMetricsUtil.newGauge("miss-rate", () -> missRate());
+    tableMetricsUtil.newGauge("req-count", () -> requestCount());
   }
 
   @Override
   public V get(K key) {
+    readMetrics.numGets.inc();
+    long startNs = System.nanoTime();
     V value = cache.get(key);
     if (value == null) {
       missCount.incrementAndGet();
@@ -121,18 +131,24 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
     } else {
       hitCount.incrementAndGet();
     }
+    readMetrics.getNs.update(System.nanoTime() - startNs);
     return value;
   }
 
   @Override
   public Map<K, V> getAll(List<K> keys) {
+    readMetrics.numGetAlls.inc();
+    long startNs = System.nanoTime();
     Map<K, V> getAllResult = new HashMap<>();
     keys.stream().forEach(k -> getAllResult.put(k, get(k)));
+    readMetrics.getAllNs.update(System.nanoTime() - startNs);
     return getAllResult;
   }
 
   @Override
   public void put(K key, V value) {
+    writeMetrics.numPuts.inc();
+    long startNs = System.nanoTime();
     Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable);
     Lock lock = stripedLocks.get(key);
     try {
@@ -144,16 +160,22 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
     } finally {
       lock.unlock();
     }
+    writeMetrics.putNs.update(System.nanoTime() - startNs);
   }
 
   @Override
   public void putAll(List<Entry<K, V>> entries) {
+    writeMetrics.numPutAlls.inc();
+    long startNs = System.nanoTime();
     Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable);
     entries.forEach(e -> put(e.getKey(), e.getValue()));
+    writeMetrics.putAllNs.update(System.nanoTime() - startNs);
   }
 
   @Override
   public void delete(K key) {
+    writeMetrics.numDeletes.inc();
+    long startNs = System.nanoTime();
     Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable);
     Lock lock = stripedLocks.get(key);
     try {
@@ -163,18 +185,25 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
     } finally {
       lock.unlock();
     }
+    writeMetrics.deleteNs.update(System.nanoTime() - startNs);
   }
 
   @Override
   public void deleteAll(List<K> keys) {
+    writeMetrics.numDeleteAlls.inc();
+    long startNs = System.nanoTime();
     Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable);
     keys.stream().forEach(k -> delete(k));
+    writeMetrics.deleteAllNs.update(System.nanoTime() - startNs);
   }
 
   @Override
   public synchronized void flush() {
+    writeMetrics.numFlushes.inc();
+    long startNs = System.nanoTime();
     Preconditions.checkNotNull(rwTable, "Cannot flush a read-only table: " + rdTable);
     rwTable.flush();
+    writeMetrics.flushNs.update(System.nanoTime() - startNs);
   }
 
   @Override
index 52d0c94..797d963 100644 (file)
@@ -88,7 +88,9 @@ public class CachingTableProvider implements TableProvider {
 
     int stripes = Integer.parseInt(cachingTableSpec.getConfig().get(LOCK_STRIPES));
     boolean isWriteAround = Boolean.parseBoolean(cachingTableSpec.getConfig().get(WRITE_AROUND));
-    return new CachingTable(cachingTableSpec.getId(), table, cache, stripes, isWriteAround);
+    CachingTable cachingTable = new CachingTable(cachingTableSpec.getId(), table, cache, stripes, isWriteAround);
+    cachingTable.init(containerContext, taskContext);
+    return cachingTable;
   }
 
   @Override
index 3f8ab51..fcded2f 100644 (file)
@@ -24,10 +24,9 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.samza.container.SamzaContainerContext;
-import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.caching.SupplierGauge;
+import org.apache.samza.table.utils.TableMetricsUtil;
 import org.apache.samza.task.TaskContext;
 
 import com.google.common.cache.Cache;
@@ -51,17 +50,14 @@ public class GuavaCacheTable<K, V> implements ReadWriteTable<K, V> {
     this.cache = cache;
   }
 
-  private void registerMetrics(String tableId, Cache cache, MetricsRegistry metricsReg) {
-    // hit- and miss-rate are provided by CachingTable.
-    metricsReg.newGauge(GROUP_NAME, new SupplierGauge(tableId + "-evict-count", () -> cache.stats().evictionCount()));
-  }
-
   /**
    * {@inheritDoc}
    */
   @Override
   public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
-    registerMetrics(tableId, cache, taskContext.getMetricsRegistry());
+    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId);
+    // hit- and miss-rate are provided by CachingTable.
+    tableMetricsUtil.newGauge("evict-count", () -> cache.stats().evictionCount());
   }
 
   @Override
index 7395b22..1ba26c7 100644 (file)
@@ -66,6 +66,7 @@ public class GuavaCacheTableProvider implements TableProvider {
   public Table getTable() {
     Cache guavaCache = SerdeUtils.deserialize(GUAVA_CACHE, guavaCacheTableSpec.getConfig().get(GUAVA_CACHE));
     GuavaCacheTable table = new GuavaCacheTable(guavaCacheTableSpec.getId(), guavaCache);
+    table.init(containerContext, taskContext);
     guavaTables.add(table);
     return table;
   }
index a640efb..95f8cfa 100644 (file)
@@ -23,10 +23,11 @@ import java.util.List;
 
 import org.apache.samza.SamzaException;
 import org.apache.samza.container.SamzaContainerContext;
-import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.utils.DefaultTableWriteMetrics;
+import org.apache.samza.table.utils.TableMetricsUtil;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.RateLimiter;
 
@@ -42,17 +43,13 @@ import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_WRITE_TAG;
  * @param <V> the type of the value in this table
  */
 public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implements ReadWriteTable<K, V> {
+
   protected final TableWriteFunction<K, V> writeFn;
   protected final CreditFunction<K, V> writeCreditFn;
   protected final boolean rateLimitWrites;
 
-  protected Timer putNs;
-  protected Timer deleteNs;
-  protected Timer flushNs;
+  protected DefaultTableWriteMetrics writeMetrics;
   protected Timer putThrottleNs; // use single timer for all write operations
-  protected Counter numPuts;
-  protected Counter numDeletes;
-  protected Counter numFlushes;
 
   public RemoteReadWriteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn,
       RateLimiter ratelimiter, CreditFunction<K, V> readCreditFn, CreditFunction<K, V> writeCreditFn) {
@@ -70,13 +67,9 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
   @Override
   public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
     super.init(containerContext, taskContext);
-    putNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-put-ns");
-    putThrottleNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-put-throttle-ns");
-    deleteNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-delete-ns");
-    flushNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-flush-ns");
-    numPuts = taskContext.getMetricsRegistry().newCounter(groupName, tableId + "-num-puts");
-    numDeletes = taskContext.getMetricsRegistry().newCounter(groupName, tableId + "-num-deletes");
-    numFlushes = taskContext.getMetricsRegistry().newCounter(groupName, tableId + "-num-flushes");
+    writeMetrics = new DefaultTableWriteMetrics(containerContext, taskContext, this, tableId);
+    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId);
+    putThrottleNs = tableMetricsUtil.newTimer("put-throttle-ns");
   }
 
   /**
@@ -91,13 +84,13 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
     }
 
     try {
-      numPuts.inc();
+      writeMetrics.numPuts.inc();
       if (rateLimitWrites) {
         throttle(key, value, RL_WRITE_TAG, writeCreditFn, putThrottleNs);
       }
       long startNs = System.nanoTime();
       writeFn.put(key, value);
-      putNs.update(System.nanoTime() - startNs);
+      writeMetrics.putNs.update(System.nanoTime() - startNs);
     } catch (Exception e) {
       String errMsg = String.format("Failed to put a record, key=%s, value=%s", key, value);
       logger.error(errMsg, e);
@@ -111,7 +104,10 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
   @Override
   public void putAll(List<Entry<K, V>> entries) {
     try {
+      writeMetrics.numPutAlls.inc();
+      long startNs = System.nanoTime();
       writeFn.putAll(entries);
+      writeMetrics.putAllNs.update(System.nanoTime() - startNs);
     } catch (Exception e) {
       String errMsg = String.format("Failed to put records: %s", entries);
       logger.error(errMsg, e);
@@ -125,13 +121,13 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
   @Override
   public void delete(K key) {
     try {
-      numDeletes.inc();
+      writeMetrics.numDeletes.inc();
       if (rateLimitWrites) {
         throttle(key, null, RL_WRITE_TAG, writeCreditFn, putThrottleNs);
       }
       long startNs = System.nanoTime();
       writeFn.delete(key);
-      deleteNs.update(System.nanoTime() - startNs);
+      writeMetrics.deleteNs.update(System.nanoTime() - startNs);
     } catch (Exception e) {
       String errMsg = String.format("Failed to delete a record, key=%s", key);
       logger.error(errMsg, e);
@@ -145,7 +141,10 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
   @Override
   public void deleteAll(List<K> keys) {
     try {
+      writeMetrics.numDeleteAlls.inc();
       writeFn.deleteAll(keys);
+      long startNs = System.nanoTime();
+      writeMetrics.deleteAllNs.update(System.nanoTime() - startNs);
     } catch (Exception e) {
       String errMsg = String.format("Failed to delete records, keys=%s", keys);
       logger.error(errMsg, e);
@@ -159,13 +158,13 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
   @Override
   public void flush() {
     try {
-      numFlushes.inc();
+      writeMetrics.numFlushes.inc();
       if (rateLimitWrites) {
         throttle(null, null, RL_WRITE_TAG, writeCreditFn, putThrottleNs);
       }
       long startNs = System.nanoTime();
       writeFn.flush();
-      flushNs.update(System.nanoTime() - startNs);
+      writeMetrics.flushNs.update(System.nanoTime() - startNs);
     } catch (Exception e) {
       String errMsg = "Failed to flush remote store";
       logger.error(errMsg, e);
index ca8e96b..d919d2f 100644 (file)
@@ -25,10 +25,11 @@ import java.util.Map;
 
 import org.apache.samza.SamzaException;
 import org.apache.samza.container.SamzaContainerContext;
-import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.operators.KV;
 import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.utils.DefaultTableReadMetrics;
+import org.apache.samza.table.utils.TableMetricsUtil;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.RateLimiter;
 import org.slf4j.Logger;
@@ -63,6 +64,7 @@ import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG;
  * @param <V> the type of the value in this table
  */
 public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
+
   protected final String tableId;
   protected final Logger logger;
   protected final TableReadFunction<K, V> readFn;
@@ -71,9 +73,8 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
   protected final CreditFunction<K, V> readCreditFn;
   protected final boolean rateLimitReads;
 
-  protected Timer getNs;
+  protected DefaultTableReadMetrics readMetrics;
   protected Timer getThrottleNs;
-  protected Counter numGets;
 
   /**
    * Construct a RemoteReadableTable instance
@@ -101,9 +102,9 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
    */
   @Override
   public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
-    getNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-get-ns");
-    getThrottleNs = taskContext.getMetricsRegistry().newTimer(groupName, tableId + "-get-throttle-ns");
-    numGets = taskContext.getMetricsRegistry().newCounter(groupName, tableId + "-num-gets");
+    readMetrics = new DefaultTableReadMetrics(containerContext, taskContext, this, tableId);
+    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId);
+    getThrottleNs = tableMetricsUtil.newTimer("get-throttle-ns");
   }
 
   /**
@@ -112,13 +113,13 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
   @Override
   public V get(K key) {
     try {
-      numGets.inc();
+      readMetrics.numGets.inc();
       if (rateLimitReads) {
         throttle(key, null, RL_READ_TAG, readCreditFn, getThrottleNs);
       }
       long startNs = System.nanoTime();
       V result = readFn.get(key);
-      getNs.update(System.nanoTime() - startNs);
+      readMetrics.getNs.update(System.nanoTime() - startNs);
       return result;
     } catch (Exception e) {
       String errMsg = String.format("Failed to get a record, key=%s", key);
@@ -134,7 +135,10 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
   public Map<K, V> getAll(List<K> keys) {
     Map<K, V> result;
     try {
+      readMetrics.numGetAlls.inc();
+      long startNs = System.nanoTime();
       result = readFn.getAll(keys);
+      readMetrics.getAllNs.update(System.nanoTime() - startNs);
     } catch (Exception e) {
       String errMsg = "Failed to get some records";
       logger.error(errMsg, e);
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java
new file mode 100644 (file)
index 0000000..a327ae3
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.utils;
+
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.Table;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * Utility class that contains the default set of read metrics.
+ */
+public class DefaultTableReadMetrics {
+
+  public final Timer getNs;
+  public final Timer getAllNs;
+  public final Counter numGets;
+  public final Counter numGetAlls;
+
+  /**
+   * Constructor based on container and task container context
+   *
+   * @param containerContext container context
+   * @param taskContext task context
+   * @param table underlying table
+   * @param tableId table Id
+   */
+  public DefaultTableReadMetrics(SamzaContainerContext containerContext, TaskContext taskContext,
+      Table table, String tableId) {
+    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, table, tableId);
+    getNs = tableMetricsUtil.newTimer("get-ns");
+    getAllNs = tableMetricsUtil.newTimer("getAll-ns");
+    numGets = tableMetricsUtil.newCounter("num-gets");
+    numGetAlls = tableMetricsUtil.newCounter("num-getAlls");
+  }
+
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java
new file mode 100644 (file)
index 0000000..150ee9a
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.utils;
+
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.Table;
+import org.apache.samza.task.TaskContext;
+
+
+public class DefaultTableWriteMetrics {
+
+  public final Timer putNs;
+  public final Timer putAllNs;
+  public final Timer deleteNs;
+  public final Timer deleteAllNs;
+  public final Timer flushNs;
+  public final Counter numPuts;
+  public final Counter numPutAlls;
+  public final Counter numDeletes;
+  public final Counter numDeleteAlls;
+  public final Counter numFlushes;
+
+  /**
+   * Utility class that contains the default set of write metrics.
+   *
+   * @param containerContext container context
+   * @param taskContext task context
+   * @param table underlying table
+   * @param tableId table Id
+   */
+  public DefaultTableWriteMetrics(SamzaContainerContext containerContext, TaskContext taskContext,
+      Table table, String tableId) {
+    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, table, tableId);
+    putNs = tableMetricsUtil.newTimer("put-ns");
+    putAllNs = tableMetricsUtil.newTimer("putAll-ns");
+    deleteNs = tableMetricsUtil.newTimer("delete-ns");
+    deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns");
+    flushNs = tableMetricsUtil.newTimer("flush-ns");
+    numPuts = tableMetricsUtil.newCounter("num-puts");
+    numPutAlls = tableMetricsUtil.newCounter("num-putAlls");
+    numDeletes = tableMetricsUtil.newCounter("num-deletes");
+    numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls");
+    numFlushes = tableMetricsUtil.newCounter("num-flushes");
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
new file mode 100644 (file)
index 0000000..6805c64
--- /dev/null
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.utils;
+
+import java.util.function.Supplier;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.caching.SupplierGauge;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * Utility class to generate metrics that helps to encapsulate required parameters,
+ * maintains naming consistency and simplifies metrics creation API for tables.
+ */
+public class TableMetricsUtil {
+
+  private final MetricsRegistry metricsRegistry;
+  private final String groupName;
+  private final String tableId;
+
+  /**
+   * Constructor based on container context
+   *
+   * @param containerContext container context
+   * @param taskContext task context
+   * @param table underlying table
+   * @param tableId table Id
+   */
+  public TableMetricsUtil(SamzaContainerContext containerContext, TaskContext taskContext,
+      Table table, String tableId) {
+
+    Preconditions.checkNotNull(containerContext);
+    Preconditions.checkNotNull(table);
+    Preconditions.checkNotNull(tableId);
+
+    this.metricsRegistry = taskContext == null // The table is at container level, when the task
+        ? containerContext.metricsRegistry     // context passed in is null
+        : taskContext.getMetricsRegistry();
+    this.groupName = table.getClass().getSimpleName();
+    this.tableId = tableId;
+  }
+
+  /**
+   * Create a new counter by delegating to the underlying metrics registry
+   * @param name name of the counter
+   * @return newly created counter
+   */
+  public Counter newCounter(String name) {
+    return metricsRegistry.newCounter(groupName, getMetricFullName(name));
+  }
+
+  /**
+   * Create a new timer by delegating to the underlying metrics registry
+   * @param name name of the timer
+   * @return newly created timer
+   */
+  public Timer newTimer(String name) {
+    return metricsRegistry.newTimer(groupName, getMetricFullName(name));
+  }
+
+  /**
+   * Create a new gauge by delegating to the underlying metrics registry
+   * @param name name of the gauge
+   * @param supplier a function that supplies the value
+   * @param <T> type of the value
+   * @return newly created gauge
+   */
+  public <T> Gauge<T> newGauge(String name, Supplier<T> supplier) {
+    return metricsRegistry.newGauge(groupName, new SupplierGauge(getMetricFullName(name), supplier));
+  }
+
+  private String getMetricFullName(String name) {
+    return String.format("%s-%s", tableId, name);
+  }
+
+}
index 906ee1d..882ae0d 100644 (file)
@@ -20,7 +20,10 @@ package org.apache.samza.storage.kv;
 
 import java.util.List;
 
+import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.utils.DefaultTableWriteMetrics;
+import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -32,6 +35,8 @@ import org.apache.samza.table.ReadWriteTable;
 public class LocalStoreBackedReadWriteTable<K, V> extends LocalStoreBackedReadableTable<K, V>
     implements ReadWriteTable<K, V> {
 
+  protected DefaultTableWriteMetrics writeMetrics;
+
   /**
    * Constructs an instance of {@link LocalStoreBackedReadWriteTable}
    * @param kvStore the backing store
@@ -40,10 +45,22 @@ public class LocalStoreBackedReadWriteTable<K, V> extends LocalStoreBackedReadab
     super(tableId, kvStore);
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
+    super.init(containerContext, taskContext);
+    writeMetrics = new DefaultTableWriteMetrics(containerContext, taskContext, this, tableId);
+  }
+
   @Override
   public void put(K key, V value) {
     if (value != null) {
+      writeMetrics.numPuts.inc();
+      long startNs = System.nanoTime();
       kvStore.put(key, value);
+      writeMetrics.putNs.update(System.nanoTime() - startNs);
     } else {
       delete(key);
     }
@@ -51,22 +68,34 @@ public class LocalStoreBackedReadWriteTable<K, V> extends LocalStoreBackedReadab
 
   @Override
   public void putAll(List<Entry<K, V>> entries) {
-    entries.forEach(e -> kvStore.put(e.getKey(), e.getValue()));
+    writeMetrics.numPutAlls.inc();
+    long startNs = System.nanoTime();
+    kvStore.putAll(entries);
+    writeMetrics.putAllNs.update(System.nanoTime() - startNs);
   }
 
   @Override
   public void delete(K key) {
+    writeMetrics.numDeletes.inc();
+    long startNs = System.nanoTime();
     kvStore.delete(key);
+    writeMetrics.deleteNs.update(System.nanoTime() - startNs);
   }
 
   @Override
   public void deleteAll(List<K> keys) {
-    keys.forEach(k -> kvStore.delete(k));
+    writeMetrics.numDeleteAlls.inc();
+    long startNs = System.nanoTime();
+    kvStore.deleteAll(keys);
+    writeMetrics.deleteAllNs.update(System.nanoTime() - startNs);
   }
 
   @Override
   public void flush() {
+    writeMetrics.numFlushes.inc();
+    long startNs = System.nanoTime();
     kvStore.flush();
+    writeMetrics.flushNs.update(System.nanoTime() - startNs);
   }
 
 }
index 5ff58ab..8d79e0d 100644 (file)
@@ -20,11 +20,12 @@ package org.apache.samza.storage.kv;
 
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
-
-import org.apache.samza.table.ReadableTable;
 
 import com.google.common.base.Preconditions;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.utils.DefaultTableReadMetrics;
+import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -35,8 +36,10 @@ import com.google.common.base.Preconditions;
  */
 public class LocalStoreBackedReadableTable<K, V> implements ReadableTable<K, V> {
 
-  protected KeyValueStore<K, V> kvStore;
-  protected String tableId;
+  protected final KeyValueStore<K, V> kvStore;
+  protected final String tableId;
+
+  protected DefaultTableReadMetrics readMetrics;
 
   /**
    * Constructs an instance of {@link LocalStoreBackedReadableTable}
@@ -49,14 +52,30 @@ public class LocalStoreBackedReadableTable<K, V> implements ReadableTable<K, V>
     this.kvStore = kvStore;
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
+    readMetrics = new DefaultTableReadMetrics(containerContext, taskContext, this, tableId);
+  }
+
   @Override
   public V get(K key) {
-    return kvStore.get(key);
+    readMetrics.numGets.inc();
+    long startNs = System.nanoTime();
+    V result = kvStore.get(key);
+    readMetrics.getNs.update(System.nanoTime() - startNs);
+    return result;
   }
 
   @Override
   public Map<K, V> getAll(List<K> keys) {
-    return keys.stream().collect(Collectors.toMap(k -> k, k -> kvStore.get(k)));
+    readMetrics.numGetAlls.inc();
+    long startNs = System.nanoTime();
+    Map<K, V> result = kvStore.getAll(keys);
+    readMetrics.getAllNs.update(System.nanoTime() - startNs);
+    return result;
   }
 
   @Override
index d30c18f..56818b5 100644 (file)
@@ -25,9 +25,11 @@ import java.util.Map;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.StorageConfig;
+import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.storage.StorageEngine;
 import org.apache.samza.table.TableSpec;
 import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -57,9 +59,11 @@ public class TestLocalBaseStoreBackedTableProvider {
   @Test
   public void testInit() {
     StorageEngine store = mock(KeyValueStorageEngine.class);
+    SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
     TaskContext taskContext = mock(TaskContext.class);
     when(taskContext.getStore(any())).thenReturn(store);
-    tableProvider.init(null, taskContext);
+    when(taskContext.getMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
+    tableProvider.init(containerContext, taskContext);
     Assert.assertNotNull(tableProvider.getTable());
   }