Add a new ListGauge metric-type
authorRay Matharu <rmatharu@linkedin.com>
Thu, 14 Jun 2018 04:01:53 +0000 (21:01 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Thu, 14 Jun 2018 04:01:53 +0000 (21:01 -0700)
This PR introduces a ListGauge type,
A subsequent PR: https://github.com/apache/samza/pull/543 shows how this issued in conjunction with a diagnostics appender for error-tracking and dumpting to kafka via SnapshotReporter.

Author: Ray Matharu <rmatharu@linkedin.com>

Reviewers: Jagadish <jagadish@apache.org>, Cameron Lee <calee@linkedin.com>

Closes #541 from rayman7718/listgauge

28 files changed:
samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java
samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
samza-api/src/main/java/org/apache/samza/util/TimestampedValue.java [moved from samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java with 97% similarity]
samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java [new file with mode: 0644]
samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java
samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java
samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java
samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java
samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala
samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java
samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala

diff --git a/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java b/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java
new file mode 100644 (file)
index 0000000..545fd45
--- /dev/null
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.metrics;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+import org.apache.samza.util.TimestampedValue;
+
+
+/**
+ * A {@link ListGauge} is a {@link Metric} that buffers multiple instances of a type T in a list.
+ * {@link ListGauge}s are useful for maintaining, recording, or collecting values over time.
+ * For example, a set of specific logging-events (e.g., errors).
+ *
+ * Eviction is controlled by parameters (maxNumberOfItems and maxStaleness), which are set during instantiation.
+ * Eviction happens during element addition or during reads of the ListGauge (getValues).
+ *
+ * All public methods are thread-safe.
+ *
+ */
+public class ListGauge<T> implements Metric {
+  private final String name;
+  private final Queue<TimestampedValue<T>> elements;
+
+  private final int maxNumberOfItems;
+  private final Duration maxStaleness;
+  private final static int DEFAULT_MAX_NITEMS = 1000;
+  private final static Duration DEFAULT_MAX_STALENESS = Duration.ofMinutes(60);
+
+  /**
+   * Create a new {@link ListGauge} that auto evicts based on the given maxNumberOfItems, maxStaleness, and period parameters.
+   *
+   * @param name Name to be assigned
+   * @param maxNumberOfItems The max number of items that can remain in the listgauge
+   * @param maxStaleness The max staleness of items permitted in the listgauge
+   */
+  public ListGauge(String name, int maxNumberOfItems, Duration maxStaleness) {
+    this.name = name;
+    this.elements = new ConcurrentLinkedQueue<TimestampedValue<T>>();
+    this.maxNumberOfItems = maxNumberOfItems;
+    this.maxStaleness = maxStaleness;
+  }
+
+  /**
+   * Create a new {@link ListGauge} that auto evicts upto a max of 100 items and a max-staleness of 60 minutes.
+   * @param name Name to be assigned
+   */
+  public ListGauge(String name) {
+    this(name, DEFAULT_MAX_NITEMS, DEFAULT_MAX_STALENESS);
+  }
+
+  /**
+   * Get the name assigned to this {@link ListGauge}
+   * @return the assigned name
+   */
+  public String getName() {
+    return this.name;
+  }
+
+  /**
+   * Get the Collection of values currently in the list.
+   * @return the collection of values
+   */
+  public Collection<T> getValues() {
+    this.evict();
+    return Collections.unmodifiableList(this.elements.stream().map(x -> x.getValue()).collect(Collectors.toList()));
+  }
+
+  /**
+   * Add a value to the list.
+   * (Timestamp assigned to this value is the current timestamp.)
+   * @param value The Gauge value to be added
+   */
+  public void add(T value) {
+    this.elements.add(new TimestampedValue<T>(value, Instant.now().toEpochMilli()));
+
+    // perform any evictions that may be needed.
+    this.evict();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void visit(MetricsVisitor visitor) {
+    visitor.listGauge(this);
+  }
+
+  /**
+   * Evicts entries from the elements list, based on the given item-size and durationThreshold.
+   */
+  private void evict() {
+    this.evictBasedOnSize();
+    this.evictBasedOnTimestamp();
+  }
+
+  /**
+   * Evicts entries from elements in FIFO order until it has maxNumberOfItems
+   */
+  private void evictBasedOnSize() {
+    int numToEvict = this.elements.size() - this.maxNumberOfItems;
+    while (numToEvict > 0) {
+      this.elements.poll(); // remove head
+      numToEvict--;
+    }
+  }
+
+  /**
+   * Removes entries from elements to ensure no element has a timestamp more than maxStaleness before current timestamp.
+   */
+  private void evictBasedOnTimestamp() {
+    Instant currentTimestamp = Instant.now();
+    TimestampedValue<T> valueInfo = this.elements.peek();
+
+    // continue remove-head if currenttimestamp - head-element's timestamp > durationThreshold
+    while (valueInfo != null
+        && currentTimestamp.toEpochMilli() - valueInfo.getTimestamp() > this.maxStaleness.toMillis()) {
+      this.elements.poll();
+      valueInfo = this.elements.peek();
+    }
+  }
+}
index 5a00d01..fa0fd39 100644 (file)
@@ -65,6 +65,15 @@ public interface MetricsRegistry {
   <T> Gauge<T> newGauge(String group, Gauge<T> value);
 
   /**
+   * Register a {@link org.apache.samza.metrics.ListGauge}
+   * @param group Group for this ListGauge
+   * @param listGauge the ListGauge to register
+   * @param <T> Type of the ListGauge
+   * @return ListGauge registered
+   */
+  <T> ListGauge<T> newListGauge(String group, ListGauge<T> listGauge);
+
+  /**
    * Create and Register a new {@link org.apache.samza.metrics.Timer}
    * @param group Group for this Timer
    * @param name Name of to-be-created Timer
index 75abfe7..49a0929 100644 (file)
@@ -31,8 +31,13 @@ public abstract class MetricsVisitor {
 
   public abstract void timer(Timer timer);
 
+  public abstract <T> void listGauge(ListGauge<T> listGauge);
+
   public void visit(Metric metric) {
-    if (metric instanceof Counter) {
+    // Cast for metrics of type ListGauge
+    if (metric instanceof ListGauge<?>) {
+      listGauge((ListGauge<?>) metric);
+    } else if (metric instanceof Counter) {
       counter((Counter) metric);
     } else if (metric instanceof Gauge<?>) {
       gauge((Gauge<?>) metric);
index 739d68f..ba5b182 100644 (file)
@@ -24,5 +24,7 @@ public interface ReadableMetricsRegistryListener {
 
   void onGauge(String group, Gauge<?> gauge);
 
+  void onListGauge(String group, ListGauge<?> listGauge);
+
   void onTimer(String group, Timer timer);
 }
index 3df855c..76b8216 100644 (file)
@@ -21,9 +21,11 @@ package org.apache.samza.util;
 
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.ListGauge;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 
+
 /**
  * {@link org.apache.samza.metrics.MetricsRegistry} implementation for when no actual metrics need to be
  * recorded but a registry is still required.
@@ -50,6 +52,11 @@ public class NoOpMetricsRegistry implements MetricsRegistry {
   }
 
   @Override
+  public <T> ListGauge<T> newListGauge(String group, ListGauge<T> listGauge) {
+    return listGauge;
+  }
+
+  @Override
   public Timer newTimer(String group, String name) {
     return new Timer(name);
   }
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java b/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java
new file mode 100644 (file)
index 0000000..eb91012
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.metrics;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Iterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Class to encapsulate test-cases for {@link org.apache.samza.metrics.ListGauge}
+ */
+public class TestListGauge {
+
+  private final static Duration THREAD_TEST_TIMEOUT = Duration.ofSeconds(10);
+
+  private <T> ListGauge<T> getListGaugeForTest() {
+    return new ListGauge<T>("sampleListGauge", 10, Duration.ofSeconds(60));
+  }
+
+  @Test
+  public void basicTest() {
+    ListGauge<String> listGauge = getListGaugeForTest();
+    listGauge.add("sampleValue");
+    Assert.assertEquals("Names should be the same", listGauge.getName(), "sampleListGauge");
+    Assert.assertEquals("List sizes should match", listGauge.getValues().size(), 1);
+    Assert.assertEquals("ListGauge should contain sampleGauge", listGauge.getValues().contains("sampleValue"), true);
+  }
+
+  @Test
+  public void testSizeEnforcement() {
+    ListGauge listGauge = getListGaugeForTest();
+    for (int i = 15; i > 0; i--) {
+      listGauge.add("v" + i);
+    }
+    Assert.assertEquals("List sizes should be as configured at creation time", listGauge.getValues().size(), 10);
+
+    int valueIndex = 10;
+    Collection<String> currentList = listGauge.getValues();
+    Iterator iterator = currentList.iterator();
+    while (iterator.hasNext()) {
+      String gaugeValue = (String) iterator.next();
+      Assert.assertTrue(gaugeValue.equals("v" + valueIndex));
+      valueIndex--;
+    }
+  }
+
+  @Test
+  public void testThreadSafety() throws InterruptedException {
+    ListGauge<Integer> listGauge = getListGaugeForTest();
+
+    Thread thread1 = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        for (int i = 1; i <= 100; i++) {
+          listGauge.add(i);
+        }
+      }
+    });
+
+    Thread thread2 = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        for (int i = 1; i <= 100; i++) {
+          listGauge.add(i);
+        }
+      }
+    });
+
+    thread1.start();
+    thread2.start();
+
+    thread1.join(THREAD_TEST_TIMEOUT.toMillis());
+    thread2.join(THREAD_TEST_TIMEOUT.toMillis());
+
+    Assert.assertTrue("ListGauge should have the last 10 values", listGauge.getValues().size() == 10);
+    for (Integer gaugeValue : listGauge.getValues()) {
+      Assert.assertTrue("Values should have the last 10 range", gaugeValue <= 100 && gaugeValue > 90);
+    }
+  }
+}
index 8076e02..c694d3f 100644 (file)
 
 package org.apache.samza.metrics;
 
-import static org.junit.Assert.*;
-
 import java.util.Arrays;
-
 import org.apache.samza.util.Clock;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 public class TestTimer {
 
   /*
index d29b975..01b69ed 100644 (file)
 
 package org.apache.samza.system.eventhub;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import org.apache.commons.collections4.map.HashedMap;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.ListGauge;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 public class TestMetricsRegistry implements MetricsRegistry {
 
   private Map<String, List<Counter>> counters = new HashedMap<>();
   private Map<String, List<Gauge<?>>> gauges = new HashedMap<>();
+  private Map<String, List<ListGauge>> listGauges = new HashedMap<>();
 
   public List<Counter> getCounters(String groupName) {
     return counters.get(groupName);
@@ -69,6 +70,13 @@ public class TestMetricsRegistry implements MetricsRegistry {
   }
 
   @Override
+  public ListGauge newListGauge(String group, ListGauge listGauge) {
+    listGauges.putIfAbsent(group, new ArrayList());
+    listGauges.get(group).add(listGauge);
+    return listGauge;
+  }
+
+  @Override
   public <T> Gauge<T> newGauge(String group, Gauge<T> value) {
     if (!gauges.containsKey(group)) {
       gauges.put(group, new ArrayList<>());
index 53526d8..fc57846 100644 (file)
@@ -44,6 +44,10 @@ public class MetricGroup {
     return registry.newCounter(groupName, (prefix + name).toLowerCase());
   }
 
+  public <T> ListGauge<T> newListGauge(String name) {
+    return registry.newListGauge(groupName, new ListGauge(name));
+  }
+
   public <T> Gauge<T> newGauge(String name, T value) {
     return registry.newGauge(groupName, new Gauge<T>((prefix + name).toLowerCase(), value));
   }
index 5ede5e8..038abba 100644 (file)
@@ -18,7 +18,7 @@
  */
 package org.apache.samza.operators.functions;
 
-import org.apache.samza.operators.impl.store.TimestampedValue;
+import org.apache.samza.util.TimestampedValue;
 import org.apache.samza.storage.kv.KeyValueStore;
 
 /**
index 0f51798..df73e48 100644 (file)
@@ -28,7 +28,7 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.operators.TimerRegistry;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.operators.impl.store.TimestampedValue;
+import org.apache.samza.util.TimestampedValue;
 import org.apache.samza.operators.spec.BroadcastOperatorSpec;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
index 90a71a0..0cdde49 100644 (file)
@@ -21,7 +21,7 @@ package org.apache.samza.operators.impl;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.operators.impl.store.TimestampedValue;
+import org.apache.samza.util.TimestampedValue;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.storage.kv.KeyValueStore;
index 6b5baae..82dc0bf 100644 (file)
@@ -28,7 +28,7 @@ import org.apache.samza.operators.functions.SupplierFunction;
 import org.apache.samza.operators.impl.store.TimeSeriesKey;
 import org.apache.samza.operators.impl.store.TimeSeriesStore;
 import org.apache.samza.operators.impl.store.TimeSeriesStoreImpl;
-import org.apache.samza.operators.impl.store.TimestampedValue;
+import org.apache.samza.util.TimestampedValue;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.operators.triggers.FiringType;
index f3d6948..c9b694d 100644 (file)
@@ -21,6 +21,8 @@
 package org.apache.samza.operators.impl.store;
 
 import org.apache.samza.storage.kv.ClosableIterator;
+import org.apache.samza.util.TimestampedValue;
+
 
 /**
  * A key-value store that allows entries to be queried and stored based on time ranges.
index f03d396..10a5967 100644 (file)
@@ -23,6 +23,7 @@ import org.apache.samza.storage.kv.ClosableIterator;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.util.TimestampedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
index b14f8a4..5b0cdac 100644 (file)
@@ -21,6 +21,7 @@ package org.apache.samza.operators.impl.store;
 import org.apache.samza.serializers.Serde;
 
 import java.nio.ByteBuffer;
+import org.apache.samza.util.TimestampedValue;
 
 
 public class TimestampedValueSerde<V> implements Serde<TimestampedValue<V>> {
index a218135..1b55784 100644 (file)
@@ -23,7 +23,7 @@ import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.impl.store.TimestampedValueSerde;
-import org.apache.samza.operators.impl.store.TimestampedValue;
+import org.apache.samza.util.TimestampedValue;
 import org.apache.samza.serializers.Serde;
 
 import java.util.Arrays;
index 38d0d9c..be0fb26 100644 (file)
@@ -30,6 +30,7 @@ import java.util.concurrent.{ExecutorService, Executors, ScheduledExecutorServic
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.MetricsConfig.Config2Metrics
index c122956..a26e666 100644 (file)
@@ -48,6 +48,8 @@ class SamzaContainerMetrics(
 
   val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]()
 
+  val exceptions = newListGauge[String]("exceptions")
+
   def addStoreRestorationGauge(taskName: TaskName, storeName: String) {
     taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L))
   }
index 1520b0e..21ec763 100644 (file)
@@ -40,9 +40,11 @@ trait MetricsHelper {
 
   def newGauge[T](name: String, value: T) = metricGroup.newGauge[T](name,value)
 
+  def newListGauge[T](name: String) = metricGroup.newListGauge[T](name)
+
   /**
-   * Specify a dynamic gauge that always returns the latest value when polled. 
-   * The value closure must be thread safe, since metrics reporters may access 
+   * Specify a dynamic gauge that always returns the latest value when polled.
+   * The value closure must be thread safe, since metrics reporters may access
    * it from another thread.
    */
   def newGauge[T](name: String, value: () => T) = {
index 40ffee2..75ed6aa 100644 (file)
@@ -75,6 +75,21 @@ class MetricsRegistryMap(val name: String) extends ReadableMetricsRegistry with
     newTimer(group, new Timer(name))
   }
 
+  /**
+    * Register a {@link org.apache.samza.metrics.ListGauge}
+    *
+    * @param group     Group for this ListGauge
+    * @param listGauge the ListGauge to register
+    * @tparam T the type of the list gauge
+    */
+  def newListGauge[T](group: String, listGauge: ListGauge[T]) = {
+    debug("Adding new listgauge %s %s %s." format(group, listGauge.getName, listGauge))
+    putAndGetGroup(group).putIfAbsent(listGauge.getName, listGauge)
+    val realListGauge = metrics.get(group).get(listGauge.getName).asInstanceOf[ListGauge[T]]
+    listeners.foreach(_.onListGauge(group, realListGauge))
+    realListGauge
+  }
+
   private def putAndGetGroup(group: String) = {
     metrics.putIfAbsent(group, new ConcurrentHashMap[String, Metric])
     metrics.get(group)
index 7da8a9c..c601b29 100644 (file)
 package org.apache.samza.metrics.reporter
 
 import java.lang.management.ManagementFactory
+
 import org.apache.samza.util.Logging
 import javax.management.MBeanServer
 import javax.management.ObjectName
+
 import org.apache.samza.config.Config
-import org.apache.samza.metrics.Counter
-import org.apache.samza.metrics.Gauge
-import org.apache.samza.metrics.Timer
-import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.metrics.MetricsReporterFactory
-import org.apache.samza.metrics.ReadableMetricsRegistry
-import org.apache.samza.metrics.ReadableMetricsRegistryListener
+import org.apache.samza.metrics._
+
 import scala.collection.JavaConverters._
-import org.apache.samza.metrics.MetricsVisitor
 import org.apache.samza.metrics.JmxUtil._
 
 class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
@@ -52,6 +48,8 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
               def counter(counter: Counter) = registerBean(new JmxCounter(counter, getObjectName(group, name, sources(registry))))
               def gauge[T](gauge: Gauge[T]) = registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, name, sources(registry))))
               def timer(timer: Timer) = registerBean(new JmxTimer(timer, getObjectName(group, name, sources(registry))))
+              def listGauge[T](listGauge: ListGauge[T]) = registerBean(new JmxListGauge(listGauge.asInstanceOf[ListGauge[Object]], getObjectName(group, name, sources(registry))))
+
             })
         }
       })
@@ -65,14 +63,15 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
         def onCounter(group: String, counter: Counter) {
           registerBean(new JmxCounter(counter, getObjectName(group, counter.getName, source)))
         }
-
         def onGauge(group: String, gauge: Gauge[_]) {
           registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, gauge.getName, source)))
         }
-
         def onTimer(group: String, timer: Timer) {
           registerBean(new JmxTimer(timer, getObjectName(group, timer.getName, source)))
         }
+        def onListGauge(group: String, listGauge: ListGauge[_]) {
+          registerBean(new JmxListGauge(listGauge.asInstanceOf[ListGauge[Object]], getObjectName(group, listGauge.getName, source)))
+        }
       }
     } else {
       warn("Trying to re-register a registry for source %s. Ignoring." format source)
@@ -110,6 +109,11 @@ class JmxGauge(g: org.apache.samza.metrics.Gauge[Object], on: ObjectName) extend
   def objectName = on
 }
 
+class JmxListGauge(g: org.apache.samza.metrics.ListGauge[Object], on: ObjectName) extends JmxGaugeMBean {
+  def getValue = g.getValues
+  def objectName = on
+}
+
 trait JmxCounterMBean extends MetricMBean {
   def getCount(): Long
 }
index 65ca49c..d300e90 100644 (file)
@@ -26,7 +26,6 @@ import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.samza.system.SystemProducer
 import org.apache.samza.system.SystemStream
 import org.apache.samza.util.Logging
-
 import java.util.HashMap
 import java.util.Map
 import java.util.concurrent.Executors
@@ -83,15 +82,18 @@ class MetricsSnapshotReporter(
   }
 
   def stop = {
-    info("Stopping producer.")
 
-    producer.stop
+    // Scheduling an event with 0 delay to ensure flushing of metrics one last time before shutdown
+    executor.schedule(this,0, TimeUnit.SECONDS)
 
     info("Stopping reporter timer.")
-
+    // Allow the scheduled task above to finish, and block for termination (for max 60 seconds)
     executor.shutdown
     executor.awaitTermination(60, TimeUnit.SECONDS)
 
+    info("Stopping producer.")
+    producer.stop
+
     if (!executor.isTerminated) {
       warn("Unable to shutdown reporter timer.")
     }
@@ -112,6 +114,8 @@ class MetricsSnapshotReporter(
         registry.getGroup(group).asScala.foreach {
           case (name, metric) =>
             metric.visit(new MetricsVisitor {
+              // for listGauge the value is returned as a list, which gets serialized
+              def listGauge[T](listGauge: ListGauge[T]) = {groupMsg.put(name, listGauge.getValues)  }
               def counter(counter: Counter) = groupMsg.put(name, counter.getCount: java.lang.Long)
               def gauge[T](gauge: Gauge[T]) = groupMsg.put(name, gauge.getValue.asInstanceOf[Object])
               def timer(timer: Timer) = groupMsg.put(name, timer.getSnapshot().getAverage(): java.lang.Double)
@@ -133,12 +137,18 @@ class MetricsSnapshotReporter(
         metricsSnapshot
       }
 
-      producer.send(source, new OutgoingMessageEnvelope(out, host, null, maybeSerialized))
+      try {
+
+        producer.send(source, new OutgoingMessageEnvelope(out, host, null, maybeSerialized))
 
-      // Always flush, since we don't want metrics to get batched up.
-      producer.flush(source)
+        // Always flush, since we don't want metrics to get batched up.
+        producer.flush(source)
+      } catch  {
+        case e: Exception => error("Exception when flushing metrics for source %s " format(source), e)
+      }
     }
 
+
     debug("Finished flushing metrics.")
   }
 }
index b87e5ed..6fdcacc 100644 (file)
@@ -52,7 +52,7 @@ import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.InitableFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.impl.store.TimestampedValue;
+import org.apache.samza.util.TimestampedValue;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
index 0315a20..94e171a 100644 (file)
@@ -24,6 +24,7 @@ import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.storage.kv.ClosableIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.util.TimestampedValue;
 import org.junit.Assert;
 import org.junit.Test;
 
index 40015ec..1621e73 100644 (file)
@@ -20,6 +20,7 @@ package org.apache.samza.operators.impl.store;
 
 import org.apache.samza.serializers.ByteSerde;
 import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.util.TimestampedValue;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
index 5c10987..c93591a 100644 (file)
@@ -46,6 +46,9 @@ object ApplicationMasterRestServlet {
       metricsRegistry.getGroup(group).asScala.foreach {
         case (name, metric) =>
           metric.visit(new MetricsVisitor() {
+            def listGauge[T](listGauge: ListGauge[T]) =
+              groupMap.put(name, listGauge.getValues)
+
             def counter(counter: Counter) =
               groupMap.put(counter.getName, counter.getCount: lang.Long)