SAMZA-1049 Enable support for reporting metrics from Monitors in SamzaRest.
authorShanthoosh Venkataraman <santhoshvenkat1988@gmail.com>
Tue, 22 Nov 2016 00:55:35 +0000 (16:55 -0800)
committerJacob Maes <jmaes@linkedin.com>
Tue, 22 Nov 2016 00:56:22 +0000 (16:56 -0800)
14 files changed:
docs/learn/documentation/versioned/rest/monitors.md
samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java [new file with mode: 0644]
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
samza-rest/src/main/java/org/apache/samza/monitor/MonitorFactory.java
samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java
samza-rest/src/test/java/org/apache/samza/monitor/mock/DummyMonitorFactory.java
samza-rest/src/test/java/org/apache/samza/monitor/mock/ExceptionThrowingMonitorFactory.java
samza-rest/src/test/java/org/apache/samza/monitor/mock/MockMonitorFactory.java
samza-rest/src/test/java/org/apache/samza/rest/TestSamzaRestService.java [new file with mode: 0644]
samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala

index 46678bb..52fd27e 100644 (file)
@@ -92,5 +92,7 @@ Set the config key `monitor.monitorName.scheduling.interval.ms` to the schedulin
 The configuration key `monitor.monitorName.scheduling.interval.ms` defines the periodic scheduling interval of
 the `monitor()` method in milli seconds.
 
+## Reporting metrics from Monitors
+Samza REST service allows the users to create and report metrics from their monitors. Reporting metrics to a metrics system is encapsulated by the metrics reporter, which should be defined in the samza-rest configuration file. Configurations for metrics reporters in Samza REST service are the same as [that of Samza Jobs](../container/metrics.md).
 
 ## [Resource Reference &raquo;](resource-directory.html)
diff --git a/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java b/samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
new file mode 100644 (file)
index 0000000..f22b0ee
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.MetricsReporterFactory;
+import scala.collection.JavaConversions;
+
+/**
+ * Helper class that instantiates the MetricsReporter.
+ */
+public class MetricsReporterLoader {
+
+  private MetricsReporterLoader() {
+  }
+
+  public static Map<String, MetricsReporter> getMetricsReporters(MetricsConfig config, String containerName) {
+    Map<String, MetricsReporter> metricsReporters = new HashMap<>();
+    for (String metricsReporterName : JavaConversions.seqAsJavaList(config.getMetricReporterNames())) {
+      String metricsFactoryClass = config.getMetricsFactoryClass(metricsReporterName).get();
+      if (metricsFactoryClass == null) {
+        throw new SamzaException(String.format("Metrics reporter %s missing .class config", metricsReporterName));
+      }
+      MetricsReporterFactory metricsReporterFactory = Util.getObj(metricsFactoryClass);
+      metricsReporters.put(metricsReporterName,
+                           metricsReporterFactory.getMetricsReporter(metricsReporterName,
+                                                                     containerName,
+                                                                     config));
+    }
+    return metricsReporters;
+  }
+}
index 227a1f0..f1d62c5 100644 (file)
@@ -50,7 +50,6 @@ import org.apache.samza.metrics.JmxServer
 import org.apache.samza.metrics.JvmMetrics
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.metrics.MetricsReporterFactory
 import org.apache.samza.serializers.SerdeFactory
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.serializers.model.SamzaObjectMapper
@@ -73,7 +72,11 @@ import org.apache.samza.task.AsyncStreamTaskAdapter
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskInstanceCollector
 import org.apache.samza.util.HighResolutionClock
-import org.apache.samza.util.{ExponentialSleepStrategy, Logging, Throttleable, Util}
+import org.apache.samza.util.ExponentialSleepStrategy
+import org.apache.samza.util.Logging
+import org.apache.samza.util.Throttleable
+import org.apache.samza.util.MetricsReporterLoader
+import org.apache.samza.util.Util
 import org.apache.samza.util.Util.asScalaClock
 
 import scala.collection.JavaConversions._
@@ -333,17 +336,7 @@ object SamzaContainer extends Logging {
 
     info("Setting up metrics reporters.")
 
-    val reporters = config.getMetricReporterNames.map(reporterName => {
-      val metricsFactoryClassName = config
-        .getMetricsFactoryClass(reporterName)
-        .getOrElse(throw new SamzaException("Metrics reporter %s missing .class config" format reporterName))
-
-      val reporter =
-        Util
-          .getObj[MetricsReporterFactory](metricsFactoryClassName)
-          .getMetricsReporter(reporterName, containerName, config)
-      (reporterName, reporter)
-    }).toMap
+    val reporters = MetricsReporterLoader.getMetricsReporters(config, containerName).toMap
 
     info("Got metrics reporters: %s" format reporters.keys)
 
index f24beb1..6c3081b 100644 (file)
 
 package org.apache.samza.metrics
 
-import org.apache.samza.SamzaException
 import org.apache.samza.clustermanager.SamzaApplicationState
 import org.apache.samza.config.Config
 import org.apache.samza.config.MetricsConfig.Config2Metrics
-import org.apache.samza.util.{Logging, Util}
+import org.apache.samza.util.Logging
+import org.apache.samza.util.MetricsReporterLoader
+
+import scala.collection.JavaConverters._
 
 object ContainerProcessManagerMetrics {
   val sourceName = "ApplicationMaster"
@@ -40,19 +42,8 @@ class ContainerProcessManagerMetrics(
                                       val registry: ReadableMetricsRegistry) extends MetricsHelper  with Logging {
 
   val jvm = new JvmMetrics(registry)
-  val reporters = config.getMetricReporterNames.map(reporterName => {
-    val metricsFactoryClassName = config
-      .getMetricsFactoryClass(reporterName)
-      .getOrElse(throw new SamzaException("Metrics reporter %s missing .class config" format reporterName))
-
-    val reporter =
-      Util
-        .getObj[MetricsReporterFactory](metricsFactoryClassName)
-        .getMetricsReporter(reporterName, ContainerProcessManagerMetrics.sourceName, config)
-
-    reporter.register(ContainerProcessManagerMetrics.sourceName, registry)
-    (reporterName, reporter)
-  }).toMap
+  val reporters = MetricsReporterLoader.getMetricsReporters(config, ContainerProcessManagerMetrics.sourceName).asScala
+  reporters.values.foreach(_.register(ContainerProcessManagerMetrics.sourceName, registry))
 
    def start() {
     val mRunningContainers = newGauge("running-containers", () => state.runningContainers.size)
index c38a5d8..b2e766f 100644 (file)
@@ -25,13 +25,6 @@ import org.apache.samza.metrics.MetricsRegistry;
  */
 public interface MonitorFactory {
 
-  /**
-   * @param config contains the configuration defined for the monitor.
-   * @param metricsRegistry instance that will allow the monitor
-   *                        implementations to register custom metrics
-   * @return Constructs and returns the monitor instance.
-   * @throws Exception if there was any problem with instantiating the monitor.
-   */
-  Monitor getMonitorInstance(MonitorConfig config, MetricsRegistry metricsRegistry)
+  Monitor getMonitorInstance(String monitorName, MonitorConfig config, MetricsRegistry metricsRegistry)
     throws Exception;
 }
index dcf9e57..ff1268c 100644 (file)
@@ -21,22 +21,17 @@ package org.apache.samza.monitor;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.util.ClassLoaderHelper;
 
-
+/**
+ * Helper class that instantiates the Monitor.
+ */
 public class MonitorLoader {
 
-  /**
-   *
-   * @param monitorConfig contains the configuration defined for a particular monitor.
-   * @param metricsRegistry instance that will be used to register custom metrics.
-   * @return the instantiated monitor object.
-   * @throws InstantiationException when there is a exception instantiating the monitor.
-   */
-  public static Monitor instantiateMonitor(MonitorConfig monitorConfig, MetricsRegistry metricsRegistry)
+  public static Monitor instantiateMonitor(String monitorName, MonitorConfig monitorConfig, MetricsRegistry metricsRegistry)
       throws InstantiationException {
       String factoryClass = monitorConfig.getMonitorFactoryClass();
       try {
         MonitorFactory monitorFactory = ClassLoaderHelper.fromClassName(factoryClass);
-        return monitorFactory.getMonitorInstance(monitorConfig, metricsRegistry);
+        return monitorFactory.getMonitorInstance(monitorName, monitorConfig, metricsRegistry);
       } catch (Exception e) {
         throw (InstantiationException)
             new InstantiationException("Unable to instantiate monitor with factory class " + factoryClass).initCause(e);
index 754ad82..80321f3 100644 (file)
@@ -65,7 +65,8 @@ public class SamzaMonitorService {
                     LOGGER.info("Scheduling monitor {} to run every {} ms", monitorName, schedulingIntervalInMs);
                     // MetricsRegistry has been added in the Monitor interface, since it's required in the eventual future to record metrics.
                     // We have plans to record metrics, hence adding this as a placeholder. We just aren't doing it yet.
-                    scheduler.schedule(getRunnable(instantiateMonitor(monitorConfig, metricsRegistry)), schedulingIntervalInMs);
+                    scheduler.schedule(getRunnable(instantiateMonitor(monitorName, monitorConfig, metricsRegistry)),
+                                                   schedulingIntervalInMs);
                 } else {
                   // When MonitorFactoryClass is not defined in the config, ignore the monitor config
                   LOGGER.warn("Not scheduling the monitor: {} to run, since monitor factory class is not set in config.", monitorName);
index 2a3e83a..f3c482f 100644 (file)
  */
 package org.apache.samza.rest;
 
+import java.util.Map;
 import joptsimple.OptionSet;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
 import org.apache.samza.monitor.SamzaMonitorService;
 import org.apache.samza.monitor.ScheduledExecutorSchedulingProvider;
 import org.apache.samza.util.CommandLine;
-import org.apache.samza.util.NoOpMetricsRegistry;
+import org.apache.samza.util.MetricsReporterLoader;
+import org.apache.samza.util.Util;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
@@ -49,17 +55,22 @@ import java.util.concurrent.ScheduledExecutorService;
 public class SamzaRestService {
 
   private static final Logger log = LoggerFactory.getLogger(SamzaRestService.class);
+  private static final String METRICS_SOURCE = "SamzaRest";
 
   private final Server server;
   private final ServletContextHandler context;
-
-
-  public SamzaRestService(SamzaRestConfig config) {
-    log.info("Creating new SamzaRestService with config: {}", config);
-    server = new Server(config.getPort());
-
-    context = new ServletContextHandler(ServletContextHandler.SESSIONS);
-    context.setContextPath("/");
+  private final ReadableMetricsRegistry metricsRegistry;
+  private final Map<String, MetricsReporter> metricsReporters;
+
+  public SamzaRestService(Server server,
+                          ReadableMetricsRegistry metricsRegistry,
+                          Map<String, MetricsReporter> metricsReporters,
+                          ServletContextHandler context) {
+    this.server = server;
+    this.metricsRegistry = metricsRegistry;
+    this.metricsReporters = metricsReporters;
+    this.context = context;
+    this.context.setContextPath("/");
     server.setHandler(context);
   }
 
@@ -74,7 +85,12 @@ public class SamzaRestService {
       throws Exception {
     try {
       SamzaRestConfig config = parseConfig(args);
-      SamzaRestService restService = new SamzaRestService(config);
+      ReadableMetricsRegistry metricsRegistry = new MetricsRegistryMap();
+      log.info("Creating new SamzaRestService with config: {}", config);
+      MetricsConfig metricsConfig = new MetricsConfig(config);
+      Map<String, MetricsReporter> metricsReporters = MetricsReporterLoader.getMetricsReporters(metricsConfig, Util.getLocalHost().getHostName());
+      SamzaRestService restService = new SamzaRestService(new Server(config.getPort()), metricsRegistry, metricsReporters,
+                                                          new ServletContextHandler(ServletContextHandler.SESSIONS));
 
       // Add applications
       SamzaRestApplication samzaRestApplication = new SamzaRestApplication(config);
@@ -85,7 +101,7 @@ public class SamzaRestService {
       ScheduledExecutorService schedulingService = Executors.newScheduledThreadPool(1);
       ScheduledExecutorSchedulingProvider schedulingProvider = new ScheduledExecutorSchedulingProvider(schedulingService);
       SamzaMonitorService monitorService = new SamzaMonitorService(config,
-                                                                   new NoOpMetricsRegistry(),
+                                                                   metricsRegistry,
                                                                    schedulingProvider);
       monitorService.start();
 
@@ -143,6 +159,12 @@ public class SamzaRestService {
    */
   public void start()
       throws Exception {
+    metricsReporters.forEach((reporterName, metricsReporter) -> {
+      log.info("Registering the metrics reporter : {},  with source :  {}.", reporterName, METRICS_SOURCE);
+      metricsReporter.register(METRICS_SOURCE, metricsRegistry);
+      log.info("Starting the metrics reporter : {}.", reporterName);
+      metricsReporter.start();
+    });
     log.info("Starting server on port {}", server.getConnectors()[0].getPort());
     server.start();
     log.info("Server is running");
@@ -155,6 +177,10 @@ public class SamzaRestService {
    */
   public void stop()
       throws Exception {
+    metricsReporters.forEach((reporterName, metricsReporter)  -> {
+      log.info("Stopping the metrics reporter : {}.", reporterName);
+      metricsReporter.stop();
+    });
     log.info("Stopping server");
     server.stop();
     log.info("Server is stopped");
index e75f494..17c3fe3 100644 (file)
@@ -56,7 +56,7 @@ public class TestMonitorService {
                                                         DummyMonitorFactory.class.getCanonicalName());
         Monitor monitor = null;
         try {
-            monitor = MonitorLoader.instantiateMonitor(new MonitorConfig(new MapConfig(configMap)),
+            monitor = MonitorLoader.instantiateMonitor("testMonitor", new MonitorConfig(new MapConfig(configMap)),
                                                        METRICS_REGISTRY);
         } catch (InstantiationException e) {
             fail();
index ccc534e..4289e8c 100644 (file)
@@ -27,7 +27,7 @@ import org.apache.samza.monitor.MonitorFactory;
 public class DummyMonitorFactory implements MonitorFactory {
 
   @Override
-  public Monitor getMonitorInstance(MonitorConfig config, MetricsRegistry metricsRegistry)
+  public Monitor getMonitorInstance(String monitorName, MonitorConfig config, MetricsRegistry metricsRegistry)
       throws Exception {
     return new DummyMonitor();
   }
index af414b4..df203ec 100644 (file)
@@ -27,7 +27,7 @@ import org.apache.samza.monitor.MonitorFactory;
 public class ExceptionThrowingMonitorFactory implements MonitorFactory {
 
   @Override
-  public Monitor getMonitorInstance(MonitorConfig config, MetricsRegistry metricsRegistry) {
+  public Monitor getMonitorInstance(String monitorName, MonitorConfig config, MetricsRegistry metricsRegistry) {
     return new ExceptionThrowingMonitor();
   }
 }
index 5b19001..a1d61e6 100644 (file)
@@ -30,7 +30,7 @@ public class MockMonitorFactory implements MonitorFactory {
   public static final Monitor MOCK_MONITOR = Mockito.mock(Monitor.class);
 
   @Override
-  public Monitor getMonitorInstance(MonitorConfig config, MetricsRegistry metricsRegistry)
+  public Monitor getMonitorInstance(String monitorName, MonitorConfig config, MetricsRegistry metricsRegistry)
       throws Exception {
     Mockito.reset(MOCK_MONITOR);
     return MOCK_MONITOR;
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/TestSamzaRestService.java b/samza-rest/src/test/java/org/apache/samza/rest/TestSamzaRestService.java
new file mode 100644 (file)
index 0000000..e0f9074
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.rest;
+
+import com.google.common.collect.ImmutableMap;
+import junit.framework.TestCase;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestSamzaRestService extends TestCase {
+
+  private final Server server = Mockito.spy(new Server());
+
+  private final ReadableMetricsRegistry metricsRegistry = Mockito.mock(ReadableMetricsRegistry.class);
+
+  private final ServletContextHandler contextHandler = Mockito.mock(ServletContextHandler.class);
+
+  private final MetricsReporter metricsReporter = Mockito.mock(MetricsReporter.class);
+
+  private SamzaRestService samzaRestService;
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    samzaRestService = new SamzaRestService(server, metricsRegistry,
+                                            ImmutableMap.of("testReporter", metricsReporter), contextHandler);
+  }
+
+  @Test
+  public void testStartShouldStartTheMetricsReportersAndServer() throws Exception {
+    Connector connector = Mockito.mock(Connector.class);
+    int testServerPort = 100;
+    Mockito.doReturn(testServerPort).when(connector).getPort();
+    Mockito.when(server.getConnectors()).thenReturn(new Connector[]{connector});
+    Mockito.doNothing().when(server).start();
+    samzaRestService.start();
+    Mockito.verify(metricsReporter).start();
+    Mockito.verify(metricsReporter).register("SamzaRest", metricsRegistry);
+    Mockito.verify(server).start();
+  }
+
+  @Test
+  public void testStopShouldStopTheMetricsReportersAndStopTheServer() throws Exception {
+    samzaRestService.stop();
+    Mockito.verify(metricsReporter).stop();
+    Mockito.verify(server).stop();
+  }
+}
index 8a5b4aa..5cc0475 100644 (file)
 package org.apache.samza.job.yarn
 
 import org.apache.samza.clustermanager.SamzaApplicationState
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.JvmMetrics
 import org.apache.samza.config.Config
-import org.apache.samza.task.TaskContext
-import org.apache.samza.Partition
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.MetricsConfig.Config2Metrics
-import org.apache.samza.metrics.MetricsReporterFactory
-import org.apache.samza.util.Util
-import org.apache.samza.metrics.ReadableMetricsRegistry
 import org.apache.samza.util.Logging
-import org.apache.samza.SamzaException
-import java.util.Timer
-import java.util.TimerTask
+import org.apache.samza.util.MetricsReporterLoader
+import org.apache.samza.metrics.ReadableMetricsRegistry
 import org.apache.samza.metrics.MetricsHelper
 
+import scala.collection.JavaConverters._
+
 object SamzaAppMasterMetrics {
   val sourceName = "ApplicationMaster"
 }
@@ -51,19 +43,8 @@ class SamzaAppMasterMetrics(
                              val state: SamzaApplicationState,
                              val registry: ReadableMetricsRegistry) extends MetricsHelper with Logging {
 
-  val reporters = config.getMetricReporterNames.map(reporterName => {
-    val metricsFactoryClassName = config
-      .getMetricsFactoryClass(reporterName)
-      .getOrElse(throw new SamzaException("Metrics reporter %s missing .class config" format reporterName))
-
-    val reporter =
-      Util
-        .getObj[MetricsReporterFactory](metricsFactoryClassName)
-        .getMetricsReporter(reporterName, SamzaAppMasterMetrics.sourceName, config)
-
-    reporter.register(SamzaAppMasterMetrics.sourceName, registry)
-    (reporterName, reporter)
-  }).toMap
+  val reporters = MetricsReporterLoader.getMetricsReporters(config, SamzaAppMasterMetrics.sourceName).asScala
+  reporters.values.foreach(_.register(SamzaAppMasterMetrics.sourceName, registry))
 
   def start() {
     val mRunningContainers = newGauge("running-containers", () => state.runningContainers.size)