SAMZA-1733: Adding filter mechanism to MetricsSnapshotReporter
authorrmatharu@linkedin.com <rmatharu@linkedin.com>
Thu, 2 Aug 2018 01:40:27 +0000 (18:40 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Thu, 2 Aug 2018 01:40:27 +0000 (18:40 -0700)
This regex based filter mechanism allow for blacklisting of metrics being reporter by the Snapshot Reporter, by specifying the blacklist in config.

* Backwards compatible -- no config => reports everything.
* Regex based, e.g.,
metrics.reporter.snapshot.blacklist=
.*
or
^(?!.*?(?:SamzaContainerMetrics|TaskInstanceMetrics)).*$

to report only container metrics and task instance metrics.

* Uses a hashset to ensure blacklisted metric are matched against the regex only once.

Author: rmatharu@linkedin.com <rmatharu@linkedin.com>

Reviewers: Jagadish <jagadish@apache.org>

Closes #591 from rmatharu/filter

samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
samza-core/src/test/java/org/apache/samza/metrics/TestMetricsSnapshotReporter.java [new file with mode: 0644]

index e9b6b76..258228c 100644 (file)
@@ -30,6 +30,7 @@ object MetricsConfig {
   val METRICS_SNAPSHOT_REPORTER_STREAM = "metrics.reporter.%s.stream"
   val METRICS_SNAPSHOT_REPORTER_INTERVAL= "metrics.reporter.%s.interval"
   val METRICS_TIMER_ENABLED= "metrics.timer.enabled"
+  val METRICS_SNAPSHOT_REPORTER_BLACKLIST = "metrics.reporter.%s.blacklist"
 
   implicit def Config2Metrics(config: Config) = new MetricsConfig(config)
 }
@@ -43,6 +44,8 @@ class MetricsConfig(config: Config) extends ScalaMapConfig(config) {
 
   def getMetricsReporterInterval(name: String): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_INTERVAL format name)
 
+  def getBlacklist(name: String): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_BLACKLIST format name)
+
   /**
    * Returns a list of all metrics names from the config file. Useful for
    * getting individual metrics.
index eca22ff..95e6705 100644 (file)
@@ -43,6 +43,7 @@ import scala.collection.JavaConverters._
  * taskName // container_567890
  * host // eat1-app128.gird
  * version // 0.0.1
+  * blacklist // Regex of metrics to ignore when flushing
  */
 class MetricsSnapshotReporter(
   producer: SystemProducer,
@@ -55,6 +56,7 @@ class MetricsSnapshotReporter(
   samzaVersion: String,
   host: String,
   serializer: Serializer[MetricsSnapshot] = null,
+  blacklist: Option[String],
   clock: () => Long = () => { System.currentTimeMillis }) extends MetricsReporter with Runnable with Logging {
 
   val execEnvironmentContainerId = Option[String](System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID)).getOrElse("")
@@ -63,6 +65,7 @@ class MetricsSnapshotReporter(
     new ThreadFactoryBuilder().setNameFormat("Samza MetricsSnapshotReporter Thread-%d").setDaemon(true).build())
   val resetTime = clock()
   var registries = List[(String, ReadableMetricsRegistry)]()
+  var blacklistedMetrics = Set[String]()
 
   info("got metrics snapshot reporter properties [job name: %s, job id: %s, containerName: %s, version: %s, samzaVersion: %s, host: %s, pollingInterval %s]"
     format (jobName, jobId, containerName, version, samzaVersion, host, pollingInterval))
@@ -117,16 +120,21 @@ class MetricsSnapshotReporter(
 
         registry.getGroup(group).asScala.foreach {
           case (name, metric) =>
-            metric.visit(new MetricsVisitor {
-              // for listGauge the value is returned as a list, which gets serialized
-              def listGauge[T](listGauge: ListGauge[T]) = {groupMsg.put(name, listGauge.getValues)  }
-              def counter(counter: Counter) = groupMsg.put(name, counter.getCount: java.lang.Long)
-              def gauge[T](gauge: Gauge[T]) = groupMsg.put(name, gauge.getValue.asInstanceOf[Object])
-              def timer(timer: Timer) = groupMsg.put(name, timer.getSnapshot().getAverage(): java.lang.Double)
-            })
+            if (!shouldIgnore(group, name)) {
+              metric.visit(new MetricsVisitor {
+                // for listGauge the value is returned as a list, which gets serialized
+                def listGauge[T](listGauge: ListGauge[T]) = { groupMsg.put(name, listGauge.getValues) }
+                def counter(counter: Counter) = groupMsg.put(name, counter.getCount: java.lang.Long)
+                def gauge[T](gauge: Gauge[T]) = groupMsg.put(name, gauge.getValue.asInstanceOf[Object])
+                def timer(timer: Timer) = groupMsg.put(name, timer.getSnapshot().getAverage(): java.lang.Double)
+              })
+            }
         }
 
-        metricsMsg.put(group, groupMsg)
+        // dont emit empty groups
+        if(!groupMsg.isEmpty) {
+          metricsMsg.put(group, groupMsg)
+        }
       })
 
       val header = new MetricsHeader(jobName, jobId, containerName, execEnvironmentContainerId, source, version, samzaVersion, host, clock(), resetTime)
@@ -155,4 +163,21 @@ class MetricsSnapshotReporter(
 
     debug("Finished flushing metrics.")
   }
+
+
+  def shouldIgnore(group: String, metricName: String) = {
+    var isBlacklisted = blacklist.isDefined
+    val fullMetricName = group + "." + metricName
+
+    if (isBlacklisted && !blacklistedMetrics.contains(fullMetricName)) {
+      if (fullMetricName.matches(blacklist.get)) {
+        blacklistedMetrics += fullMetricName
+        info("Blacklisted metric %s because it matched blacklist regex: %s" format(fullMetricName, blacklist.get))
+      } else {
+        isBlacklisted = false
+      }
+    }
+
+    isBlacklisted
+  }
 }
index e41d4a8..6155f98 100644 (file)
@@ -107,6 +107,10 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
       .getOrElse("60").toInt
 
     info("Setting polling interval to %d" format pollingInterval)
+
+    val blacklist = config.getBlacklist(name)
+    info("Setting blacklist to %s" format blacklist)
+
     val reporter = new MetricsSnapshotReporter(
       producer,
       systemStream,
@@ -117,7 +121,7 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
       version,
       samzaVersion,
       Util.getLocalHost.getHostName,
-      serde)
+      serde, blacklist)
 
     reporter.register(this.getClass.getSimpleName.toString, registry)
 
diff --git a/samza-core/src/test/java/org/apache/samza/metrics/TestMetricsSnapshotReporter.java b/samza-core/src/test/java/org/apache/samza/metrics/TestMetricsSnapshotReporter.java
new file mode 100644 (file)
index 0000000..1ddf70f
--- /dev/null
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
+import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.inmemory.InMemorySystemProducer;
+import org.junit.Assert;
+import org.junit.Test;
+import scala.Some;
+import scala.runtime.AbstractFunction0;
+
+
+public class TestMetricsSnapshotReporter {
+  private MetricsSnapshotReporter metricsSnapshotReporter;
+  private static final String BLACKLIST_ALL = ".*";
+  private static final String BLACKLIST_NONE = "";
+  private static final String BLACKLIST_GROUPS = ".*(SystemConsumersMetrics|CachedStoreMetrics).*";
+  private static final String BLACKLIST_ALL_BUT_TWO_GROUPS = "^(?!.*?(?:SystemConsumersMetrics|CachedStoreMetrics)).*$";
+
+  @Test
+  public void testBlacklistAll() {
+    this.metricsSnapshotReporter = getMetricsSnapshotReporter(BLACKLIST_ALL);
+
+    Assert.assertTrue("Should ignore all metrics",
+        this.metricsSnapshotReporter.shouldIgnore("org.apache.samza.system.kafka.KafkaSystemProducerMetrics",
+            "kafka-flush-ns"));
+
+    Assert.assertTrue("Should ignore all metrics",
+        this.metricsSnapshotReporter.shouldIgnore("org.apache.samza.storage.kv.LoggedStoreMetrics", "stats-ranges"));
+
+    Assert.assertTrue("Should ignore all metrics",
+        this.metricsSnapshotReporter.shouldIgnore("org.apache.samza.system.SystemProducersMetrics", "flushes"));
+  }
+
+  @Test
+  public void testBlacklistNone() {
+    this.metricsSnapshotReporter = getMetricsSnapshotReporter(BLACKLIST_NONE);
+
+    Assert.assertFalse("Should not ignore any metrics",
+        this.metricsSnapshotReporter.shouldIgnore("org.apache.samza.system.kafka.KafkaSystemProducerMetrics",
+            "kafka-flush-ns"));
+
+    Assert.assertFalse("Should not ignore any metrics",
+        this.metricsSnapshotReporter.shouldIgnore("org.apache.samza.storage.kv.LoggedStoreMetrics", "stats-ranges"));
+
+    Assert.assertFalse("Should not ignore any metrics",
+        this.metricsSnapshotReporter.shouldIgnore("org.apache.samza.system.SystemProducersMetrics", "flushes"));
+  }
+
+  @Test
+  public void testBlacklistGroup() {
+    this.metricsSnapshotReporter = getMetricsSnapshotReporter(BLACKLIST_GROUPS);
+    Assert.assertTrue("Should ignore all metrics from this group",
+        this.metricsSnapshotReporter.shouldIgnore("org.apache.samza.system.SystemConsumersMetrics", "poll-ns"));
+
+    Assert.assertTrue("Should ignore all metrics from this group",
+        this.metricsSnapshotReporter.shouldIgnore("org.apache.samza.system.SystemConsumersMetrics",
+            "unprocessed-messages"));
+
+    Assert.assertTrue("Should ignore all metrics from this group",
+        this.metricsSnapshotReporter.shouldIgnore("org.apache.samza.storage.kv.CachedStoreMetrics",
+            "storename-stats-flushes"));
+
+    Assert.assertFalse("Should not ignore any other group",
+        this.metricsSnapshotReporter.shouldIgnore("org.apache.samza.system.kafka.KafkaSystemConsumerMetrics",
+            "poll-count"));
+  }
+
+  @Test
+  public void testBlacklistAllButTwoGroups() {
+    this.metricsSnapshotReporter = getMetricsSnapshotReporter(BLACKLIST_ALL_BUT_TWO_GROUPS);
+
+    Assert.assertFalse("Should not ignore this group",
+        this.metricsSnapshotReporter.shouldIgnore("org.apache.samza.system.SystemConsumersMetrics", "poll-ns"));
+
+    Assert.assertFalse("Should not ignore this group",
+        this.metricsSnapshotReporter.shouldIgnore("org.apache.samza.storage.kv.CachedStoreMetrics",
+            "storename-stats-flushes"));
+
+    Assert.assertTrue("Should ignore all metrics from any other groups",
+        this.metricsSnapshotReporter.shouldIgnore("org.apache.samza.system.kafka.KafkaSystemConsumerMetrics",
+            "poll-count"));
+  }
+
+  private MetricsSnapshotReporter getMetricsSnapshotReporter(String blacklist) {
+    return new MetricsSnapshotReporter(new InMemorySystemProducer("test system", null),
+        new SystemStream("test system", "test stream"), 60000, "test job", "test jobID", "samza-container-0",
+        "test version", "test samza version", "test host", new MetricsSnapshotSerdeV2(), new Some<>(blacklist),
+        new AbstractFunction0<Object>() {
+          @Override
+          public Object apply() {
+            return System.currentTimeMillis();
+          }
+        });
+  }
+}