SAMZA-1024 Enable support for defining custom configuration for individual monitors
authorShanthoosh Venkataraman <santhoshvenkat1988@gmail.com>
Tue, 27 Sep 2016 19:15:56 +0000 (12:15 -0700)
committerJacob Maes <jmaes@linkedin.com>
Tue, 27 Sep 2016 19:15:56 +0000 (12:15 -0700)
15 files changed:
docs/learn/documentation/versioned/rest/monitors.md
docs/learn/documentation/versioned/rest/overview.md
samza-rest/src/main/java/org/apache/samza/monitor/Monitor.java
samza-rest/src/main/java/org/apache/samza/monitor/MonitorConfig.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/monitor/MonitorFactory.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
samza-rest/src/main/java/org/apache/samza/monitor/SchedulingProvider.java
samza-rest/src/main/java/org/apache/samza/rest/SamzaRestConfig.java
samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java
samza-rest/src/test/java/org/apache/samza/monitor/mock/DummyMonitor.java
samza-rest/src/test/java/org/apache/samza/monitor/mock/DummyMonitorFactory.java [new file with mode: 0644]
samza-rest/src/test/java/org/apache/samza/monitor/mock/ExceptionThrowingMonitor.java
samza-rest/src/test/java/org/apache/samza/monitor/mock/ExceptionThrowingMonitorFactory.java [new file with mode: 0644]

index 9833068..46678bb 100644 (file)
@@ -20,16 +20,77 @@ title: Monitors
 -->
 
 
-Samza REST supports the ability to add Monitors to the service. The initial implementation is very basic. Monitors are essentially tasks that can be scheduled to run periodically. They do not read the config and they are all scheduled at the same global interval. More capabilities will be added later, but the initial implementation supports simple cases like monitoring the YARN NodeManager and restarting it if it dies.
+Samza REST supports the ability to add Monitors to the service. Monitors are essentially tasks that can be scheduled to run periodically.
+It provides the capability to the users to define configurations that are specific to individual Monitors.
+These configurations are injected into the monitor instances through the Config instances.
+
+## Monitor configuration
+All of the configuration keys for the monitors should be prefixed with monitor.{monitorName}.
+Since each monitor is expected to have an unique name, these prefixes provide the namespacing across
+the monitor configurations.
+
+The following configurations are required for each of the monitors.
+  <table class="table table-condensed table-bordered table-striped">
+        <thead>
+          <tr>
+            <th>Name</th>
+            <th>Default</th>
+            <th>Description</th>
+          </tr>
+        </thead>
+        <tbody>
+          <tr>
+            <td>monitor.monitorName.scheduling.interval.ms</td>
+            <td></td>
+            <td>This defines the periodic scheduling interval in milliseconds
+            for a monitor named monitorName. If this configuration is
+            not defined, it is defaulted to 60 seconds.</td>
+          </tr>
+          <tr>
+            <td>monitor.monitorName.factory.class</td>
+            <td></td>
+            <td>
+            <b>Required:</b> This should contain a fully qualified name
+            of a class that implements the MonitorFactory interface.
+            Monitors that are instantiated by the factory implementation will be scheduled for periodic execution.
+            Custom implementations of the MonitorFactory interface are expected to inject the Config
+            and MetricsRegistry instances available in the createMonitor method into the Monitors.
+            </td>
+          </tr>
+          </tr>
+        </tbody>
+  </table>
+
+  For example, configurations for two monitors named NMTaskMonitor and RMTaskMonitor should be defined as follows.
+
+  {% highlight jproperties %}
+  monitor.RMTaskMonitor.factory.class=org.apache.samza.monitor.RMTaskMonitor
+
+  monitor.RMTaskMonitor.scheduling.interval.ms=1000
+
+  monitor.RMTaskMonitor.custom.config.key1=configValue1
+
+  monitor.NMTaskMonitor.factory.class=org.apache.samza.monitor.NMTaskMonitor
+
+  monitor.NMTaskMonitor.scheduling.interval.ms=2000
+
+  monitor.NMTaskMonitor.custom.config.key2=configValue2
+
+  {% endhighlight %}
 
 ## Implementing a New Monitor
 Implement the [Monitor](javadocs/org/apache/samza/monitor/Monitor.html) interface with some behavior that should be executed periodically. The Monitor is Java code that invokes some method on the SAMZA Rest Service, runs a bash script to restart a failed NodeManager, or cleans old RocksDB sst files left by Host Affinity, for example.
 
+Implement the [MonitorFactory](javadocs/org/apache/samza/monitor/MonitorFactory.html) interface,
+which will be used to instantiate your Monitor. Each Monitor implementation should
+have a associated MonitorFactory implementation, which is responsible for instantiating the monitors.
+
 ## Adding a New Monitor to the Samza REST Service
-Add the fully-qualified class name of the Monitor implementation to the `monitor.classes` property in the service config.
+Add the fully-qualified class name of the MonitorFactory implementation to the `monitor.monitorName.factory.class` property in the service config.
+Set the config key `monitor.monitorName.scheduling.interval.ms` to the scheduling interval in milliseconds.
 
-Set the `monitor.run.interval.ms` property to the appropriate interval. The `monitor()` method will be invoked at this interval.
+The configuration key `monitor.monitorName.scheduling.interval.ms` defines the periodic scheduling interval of
+the `monitor()` method in milli seconds.
 
-For more information on these properties, see the config table in the [Overview page.](overview.html)
 
 ## [Resource Reference &raquo;](resource-directory.html)
index 5b958ac..c382f03 100644 (file)
@@ -79,12 +79,6 @@ job.installations.path=/hello-samza-ROOT/deploy/samza
     <tr>
       <td>rest.resource.classes</td><td></td><td>A comma-delimited list of class names of resources to register with the server. These classes can be instantiated as often as each request, the life cycle is not guaranteed to match the server. Also, the instances do not receive any config. Note that the lifecycle and ability to receive config are the primary differences between resources added via this property versus rest.resource.factory.classes</td>
     </tr>
-    <tr>
-      <td>monitor.classes</td><td></td><td>A comma-delimited list of monitor classes to use. These should be fully-qualified (org.apache.samza...) and must implement the Monitor interface.</td>
-    </tr>
-    <tr>
-      <td>monitor.run.interval.ms</td><td>60000</td><td>The interval at which to call the run() method of each monitor. This one value applies to all monitors. They are not individually configurable.</td>
-    </tr>
   </tbody>
 </table>
 
index d69df5f..0d48213 100644 (file)
@@ -35,5 +35,4 @@ public interface Monitor {
      */
     void monitor()
         throws Exception;
-
 }
\ No newline at end of file
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorConfig.java b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorConfig.java
new file mode 100644 (file)
index 0000000..6743291
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang.StringUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+
+/**
+ * Configurations for the {@link Monitor} implementations.
+ */
+public class MonitorConfig extends MapConfig {
+
+  public static final String CONFIG_SCHEDULING_INTERVAL = "scheduling.interval.ms";
+
+  public static final String CONFIG_MONITOR_FACTORY_CLASS = "factory.class";
+
+  private static final int DEFAULT_SCHEDULING_INTERVAL_IN_MS = 60000;
+
+  private static final String MONITOR_CONFIG_KEY_SEPARATOR = ".";
+
+  private static final String MONITOR_PREFIX = String.format("monitor%s", MONITOR_CONFIG_KEY_SEPARATOR);
+
+  public MonitorConfig(Config config) {
+    super(config);
+  }
+
+  /**
+   *
+   * Groups configuration defined in the config object for each of the monitors into a MonitorConfig object
+   * @param config contains the entire configuration defined for all the monitors.
+   * @return a map of monitorName, {@link MonitorConfig}, where each MonitorConfig object
+   * contains all the configuration defined for the monitor named monitorName.
+   */
+  public static Map<String, MonitorConfig> getMonitorConfigs(Config config) {
+    Map<String, MonitorConfig> monitorConfigMap = new HashMap<>();
+    Config monitorConfig = config.subset(MONITOR_PREFIX);
+    for (String monitorName : getMonitorNames(monitorConfig)) {
+      monitorConfigMap.put(monitorName,
+                           new MonitorConfig(monitorConfig.subset(monitorName + MONITOR_CONFIG_KEY_SEPARATOR)));
+    }
+    return monitorConfigMap;
+  }
+
+  /**
+   *
+   * @param config contains all the configuration that are defined for the monitors.
+   * @return a unique collection of monitor names for which configuration has been defined in the config object
+   */
+  private static Set<String> getMonitorNames(Config config) {
+    Set<String> monitorNames = new HashSet<>();
+    for (String configKey : config.keySet()) {
+      String[] configKeyComponents = StringUtils.split(configKey, MONITOR_CONFIG_KEY_SEPARATOR);
+      Preconditions.checkState(configKeyComponents.length != 0);
+      String monitorName = configKeyComponents[0];
+      monitorNames.add(monitorName);
+    }
+    return monitorNames;
+  }
+
+  /**
+   *
+   * @return the monitor config factory class name that will be used to create
+   * the monitor instances.
+   */
+  public String getMonitorFactoryClass() {
+    return get(CONFIG_MONITOR_FACTORY_CLASS);
+  }
+
+  /**
+   *
+   * @return the periodic scheduling interval defined in the Config. If the configuration
+   * key is undefined, return the default value({@link MonitorConfig#DEFAULT_SCHEDULING_INTERVAL_IN_MS}).
+   */
+  public int getSchedulingIntervalInMs() {
+    return getInt(CONFIG_SCHEDULING_INTERVAL, DEFAULT_SCHEDULING_INTERVAL_IN_MS);
+  }
+}
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorFactory.java b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorFactory.java
new file mode 100644 (file)
index 0000000..c38a5d8
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor;
+
+import org.apache.samza.metrics.MetricsRegistry;
+
+/**
+ * Factory to build {@link org.apache.samza.monitor.Monitor} using provided config.
+ */
+public interface MonitorFactory {
+
+  /**
+   * @param config contains the configuration defined for the monitor.
+   * @param metricsRegistry instance that will allow the monitor
+   *                        implementations to register custom metrics
+   * @return Constructs and returns the monitor instance.
+   * @throws Exception if there was any problem with instantiating the monitor.
+   */
+  Monitor getMonitorInstance(MonitorConfig config, MetricsRegistry metricsRegistry)
+    throws Exception;
+}
index 502ecc4..dcf9e57 100644 (file)
  */
 package org.apache.samza.monitor;
 
+import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.util.ClassLoaderHelper;
 
-import java.lang.reflect.Constructor;
 
-class MonitorLoader {
+public class MonitorLoader {
 
-    private MonitorLoader() {}
-
-    public static Monitor fromClassName(String monitorClassName)
-        throws InstantiationException {
-        Object monitorObject;
-        try {
-            monitorObject = ClassLoaderHelper.fromClassName(monitorClassName);
-        } catch (Exception e) {
-            throw (InstantiationException)
-                new InstantiationException("Unable to instantiate " + monitorClassName).initCause(e);
-        }
-        if (!(monitorObject instanceof Monitor)) {
-            throw new InstantiationException(monitorClassName + " is not an instance of Monitor");
-        }
-        return (Monitor) monitorObject;
-    }
+  /**
+   *
+   * @param monitorConfig contains the configuration defined for a particular monitor.
+   * @param metricsRegistry instance that will be used to register custom metrics.
+   * @return the instantiated monitor object.
+   * @throws InstantiationException when there is a exception instantiating the monitor.
+   */
+  public static Monitor instantiateMonitor(MonitorConfig monitorConfig, MetricsRegistry metricsRegistry)
+      throws InstantiationException {
+      String factoryClass = monitorConfig.getMonitorFactoryClass();
+      try {
+        MonitorFactory monitorFactory = ClassLoaderHelper.fromClassName(factoryClass);
+        return monitorFactory.getMonitorInstance(monitorConfig, metricsRegistry);
+      } catch (Exception e) {
+        throw (InstantiationException)
+            new InstantiationException("Unable to instantiate monitor with factory class " + factoryClass).initCause(e);
+      }
+  }
 }
index 2f4d9dd..ce947f7 100644 (file)
  */
 package org.apache.samza.monitor;
 
+import java.util.Map;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.rest.SamzaRestConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+
+import static org.apache.samza.monitor.MonitorConfig.getMonitorConfigs;
+import static org.apache.samza.monitor.MonitorLoader.instantiateMonitor;
 
 
 /**
@@ -34,22 +38,34 @@ import java.util.List;
  */
 public class SamzaMonitorService {
 
-    private static final Logger log = LoggerFactory.getLogger(SamzaMonitorService.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(SamzaMonitorService.class);
 
     private final SchedulingProvider scheduler;
     private final SamzaRestConfig config;
+    private final MetricsRegistry metricsRegistry;
 
-    public SamzaMonitorService(SamzaRestConfig config, SchedulingProvider schedulingProvider) {
-        this.scheduler = schedulingProvider;
+    public SamzaMonitorService(SamzaRestConfig config,
+                               MetricsRegistry metricsRegistry,
+                               SchedulingProvider schedulingProvider) {
         this.config = config;
+        this.metricsRegistry = metricsRegistry;
+        this.scheduler = schedulingProvider;
     }
 
     public void start() {
-        List<Monitor> monitors = getMonitorsFromConfig(config);
-        int monitorRunInterval = config.getConfigMonitorIntervalMs();
-        for (Monitor monitor : monitors) {
-            log.debug("Scheduling monitor {} to run every {}ms", monitor, monitorRunInterval);
-            this.scheduler.schedule(getRunnable(monitor), monitorRunInterval);
+        try {
+            Map<String, MonitorConfig> monitorConfigs = getMonitorConfigs(config);
+            for (Map.Entry<String, MonitorConfig> entry : monitorConfigs.entrySet()) {
+                MonitorConfig monitorConfig = entry.getValue();
+                int schedulingIntervalInMs = monitorConfig.getSchedulingIntervalInMs();
+                LOGGER.info("Scheduling monitor {} to run every {} ms", entry.getKey(), schedulingIntervalInMs);
+                // MetricsRegistry has been added in the Monitor interface, since it's required in the eventual future to record metrics.
+                // We have plans to record metrics, hence adding this as a placeholder. We just aren't doing it yet.
+                scheduler.schedule(getRunnable(instantiateMonitor(monitorConfig, metricsRegistry)), schedulingIntervalInMs);
+            }
+        } catch (InstantiationException e) {
+            LOGGER.error("Exception when instantiating the monitor : ", e);
+            throw new SamzaException(e);
         }
     }
 
@@ -63,33 +79,13 @@ public class SamzaMonitorService {
                 try {
                     monitor.monitor();
                 } catch (IOException e) {
-                    log.warn("Caught IOException during " + monitor.toString() + ".monitor()", e);
+                    LOGGER.error("Caught IOException during " + monitor.toString() + ".monitor()", e);
                 } catch (InterruptedException e) {
-                    log.warn("Caught InterruptedException during " + monitor.toString() + ".monitor()", e);
+                    LOGGER.error("Caught InterruptedException during " + monitor.toString() + ".monitor()", e);
                 } catch (Exception e) {
-                    log.warn("Unexpected exception during {}.monitor()", monitor, e);
+                    LOGGER.error("Unexpected exception during {}.monitor()", monitor, e);
                 }
             }
         };
     }
-
-    /**
-     * Get all the registered monitors for the service.
-     * @return a list of Monitor objects ready to be scheduled.
-     */
-    private static List<Monitor> getMonitorsFromConfig(SamzaRestConfig config) {
-        List<String> classNames = config.getConfigMonitorClassList();
-        List<Monitor> monitors = new ArrayList<>();
-
-        for (String name: classNames) {
-            try {
-                Monitor monitor = MonitorLoader.fromClassName(name);
-                monitors.add(monitor);
-            } catch (InstantiationException e) {
-                log.warn("Unable to instantiate monitor " + name, e);
-            }
-        }
-        return monitors;
-    }
-
 }
index aea1a92..627a333 100644 (file)
  */
 package org.apache.samza.monitor;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
 /**
  * Provides scheduling functionality to the SamzaMonitorService.
  */
index 6f5c10a..47b0663 100644 (file)
@@ -44,23 +44,6 @@ public class SamzaRestConfig extends MapConfig {
   public static final String CONFIG_REST_RESOURCE_CLASSES = "rest.resource.classes";
 
   /**
-   * Specifies a comma-delimited list of class names corresponding to Monitor implementations.
-   * These will be instantiated and scheduled to run periodically at runtime.
-   * Note that you must include the ENTIRE package name (org.apache.samza...).
-   */
-  public static final String CONFIG_MONITOR_CLASSES = "monitor.classes";
-
-  /**
-   * Specifies the interval at which each registered Monitor's monitor method will be called.
-   */
-  public static final String CONFIG_MONITOR_INTERVAL_MS = "monitor.run.interval.ms";
-
-  /**
-   * Monitors run every 60s by default
-   */
-  private static final int DEFAULT_MONITOR_INTERVAL = 60000;
-
-  /**
    * The port number to use for the HTTP server or 0 to dynamically choose a port.
    */
   public static final String CONFIG_SAMZA_REST_SERVICE_PORT = "services.rest.port";
@@ -97,23 +80,6 @@ public class SamzaRestConfig extends MapConfig {
   }
 
   /**
-   * @see SamzaRestConfig#CONFIG_MONITOR_CLASSES
-   * @return a list of class names as Strings corresponding to Monitors that
-   *          Samza REST should schedule or an empty list if none were configured.
-   */
-  public List<String> getConfigMonitorClassList() {
-    return parseCommaDelimitedStrings(get(CONFIG_MONITOR_CLASSES));
-  }
-
-  /**
-   * @see SamzaRestConfig#CONFIG_MONITOR_INTERVAL_MS
-   * @return an integer number of milliseconds, the period at which to schedule monitor runs.
-   */
-  public int getConfigMonitorIntervalMs() {
-    return getInt(CONFIG_MONITOR_INTERVAL_MS, DEFAULT_MONITOR_INTERVAL);
-  }
-
-  /**
    * Parses a string containing a set of comma-delimited strings. Whitespace is ignored.
    * If the input string is null or empty, an empty list is returned.
    *
index 5b34da8..2a3e83a 100644 (file)
@@ -23,6 +23,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.monitor.SamzaMonitorService;
 import org.apache.samza.monitor.ScheduledExecutorSchedulingProvider;
 import org.apache.samza.util.CommandLine;
+import org.apache.samza.util.NoOpMetricsRegistry;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
@@ -83,7 +84,9 @@ public class SamzaRestService {
       // Schedule monitors to run
       ScheduledExecutorService schedulingService = Executors.newScheduledThreadPool(1);
       ScheduledExecutorSchedulingProvider schedulingProvider = new ScheduledExecutorSchedulingProvider(schedulingService);
-      SamzaMonitorService monitorService = new SamzaMonitorService(config, schedulingProvider);
+      SamzaMonitorService monitorService = new SamzaMonitorService(config,
+                                                                   new NoOpMetricsRegistry(),
+                                                                   schedulingProvider);
       monitorService.start();
 
       restService.runBlocking();
index 1da3430..4618b54 100644 (file)
  */
 package org.apache.samza.monitor;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.monitor.mock.ExceptionThrowingMonitor;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.monitor.mock.DummyMonitorFactory;
+import org.apache.samza.monitor.mock.ExceptionThrowingMonitorFactory;
 import org.apache.samza.monitor.mock.InstantSchedulingProvider;
 import org.apache.samza.rest.SamzaRestConfig;
+import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.Test;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static junit.framework.TestCase.assertTrue;
+import static org.apache.samza.monitor.MonitorConfig.CONFIG_MONITOR_FACTORY_CLASS;
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 public class TestMonitorService {
 
+    private static final MetricsRegistry METRICS_REGISTRY = new NoOpMetricsRegistry();
+
     @Test
-    public void testGetMonitorsFromClassName() {
-        // Test that monitors are instantiated properly from config strings.
+    public void testMonitorsShouldBeInstantiatedProperly() {
+        // Test that a monitor should be instantiated properly by invoking
+        // the appropriate factory method.
+        Map<String, String> configMap = ImmutableMap.of(CONFIG_MONITOR_FACTORY_CLASS,
+                                                        DummyMonitorFactory.class.getCanonicalName());
         Monitor monitor = null;
         try {
-            monitor = MonitorLoader.fromClassName("org.apache.samza.monitor.mock.DummyMonitor");
+            monitor = MonitorLoader.instantiateMonitor(new MonitorConfig(new MapConfig(configMap)),
+                                                       METRICS_REGISTRY);
         } catch (InstantiationException e) {
             fail();
         }
-
-        // Object should implement monitor().
+        assertNotNull(monitor);
+        // Object should implement the monitor().
         try {
             monitor.monitor();
         } catch (Exception e) {
@@ -55,17 +68,33 @@ public class TestMonitorService {
     }
 
     @Test
+    public void testShouldGroupRelevantMonitorConfigTogether() {
+        // Test that Monitor Loader groups relevant config together.
+        Map<String, String> firstMonitorConfig = ImmutableMap.of("monitor.monitor1.factory.class",
+                                                                 "org.apache.samza.monitor.DummyMonitor",
+                                                                 "monitor.monitor1.scheduling.interval.ms",
+                                                                 "100");
+        Map<String, String> secondMonitorConfig = ImmutableMap.of("monitor.monitor2.factory.class",
+                                                                  "org.apache.samza.monitor.DummyMonitor",
+                                                                  "monitor.monitor2.scheduling.interval.ms",
+                                                                  "200");
+        MapConfig mapConfig = new MapConfig(ImmutableList.of(firstMonitorConfig, secondMonitorConfig));
+        MonitorConfig expectedFirstConfig = new MonitorConfig(new MapConfig(firstMonitorConfig).subset("monitor.monitor1."));
+        MonitorConfig expectedSecondConfig = new MonitorConfig(new MapConfig(secondMonitorConfig).subset("monitor.monitor2."));
+        Map<String, MonitorConfig> expected = ImmutableMap.of("monitor1", expectedFirstConfig, "monitor2", expectedSecondConfig);
+        assertEquals(expected, MonitorConfig.getMonitorConfigs(mapConfig));
+    }
+
+    @Test
     public void testMonitorExceptionIsolation() {
         // Test that an exception from a monitor doesn't bubble up out of the scheduler.
-        Monitor monitor = new ExceptionThrowingMonitor();
-        InstantSchedulingProvider provider = new InstantSchedulingProvider();
-
-        // Initialize with a monitor that immediately throws an exception when run.
-        Map<String, String> map = new HashMap<>();
-        map.put(SamzaRestConfig.CONFIG_MONITOR_CLASSES, "org.apache.samza.monitor.mock.ExceptionThrowingMonitor");
-        map.put(SamzaRestConfig.CONFIG_MONITOR_INTERVAL_MS, "1");
-        SamzaRestConfig config = new SamzaRestConfig(new MapConfig(map));
-        SamzaMonitorService monitorService = new SamzaMonitorService(config, provider);
+        Map<String, String> configMap =
+            ImmutableMap.of(String.format("monitor.name.%s", CONFIG_MONITOR_FACTORY_CLASS),
+                            ExceptionThrowingMonitorFactory.class.getCanonicalName());
+        SamzaRestConfig config = new SamzaRestConfig(new MapConfig(configMap));
+        SamzaMonitorService monitorService = new SamzaMonitorService(config,
+                                                                     METRICS_REGISTRY,
+                                                                     new InstantSchedulingProvider());
 
         // This will throw if the exception isn't caught within the provider.
         monitorService.start();
index 8621db1..c6a2b28 100644 (file)
@@ -22,6 +22,7 @@ import org.apache.samza.monitor.Monitor;
 
 public class DummyMonitor implements Monitor {
 
+    @Override
     public void monitor() {
         // Do nothing!
     }
diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/mock/DummyMonitorFactory.java b/samza-rest/src/test/java/org/apache/samza/monitor/mock/DummyMonitorFactory.java
new file mode 100644 (file)
index 0000000..ccc534e
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor.mock;
+
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.monitor.Monitor;
+import org.apache.samza.monitor.MonitorConfig;
+import org.apache.samza.monitor.MonitorFactory;
+
+
+public class DummyMonitorFactory implements MonitorFactory {
+
+  @Override
+  public Monitor getMonitorInstance(MonitorConfig config, MetricsRegistry metricsRegistry)
+      throws Exception {
+    return new DummyMonitor();
+  }
+}
index c4f3f73..035e2ed 100644 (file)
 package org.apache.samza.monitor.mock;
 
 import org.apache.samza.monitor.Monitor;
-
 import java.io.IOException;
 
+
 public class ExceptionThrowingMonitor implements Monitor {
+
+    @Override
     public void monitor() throws IOException {
         throw new IOException("I don't know what I was expecting.");
     }
diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/mock/ExceptionThrowingMonitorFactory.java b/samza-rest/src/test/java/org/apache/samza/monitor/mock/ExceptionThrowingMonitorFactory.java
new file mode 100644 (file)
index 0000000..af414b4
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor.mock;
+
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.monitor.Monitor;
+import org.apache.samza.monitor.MonitorConfig;
+import org.apache.samza.monitor.MonitorFactory;
+
+
+public class ExceptionThrowingMonitorFactory implements MonitorFactory {
+
+  @Override
+  public Monitor getMonitorInstance(MonitorConfig config, MetricsRegistry metricsRegistry) {
+    return new ExceptionThrowingMonitor();
+  }
+}