SAMZA-1039 Selective loading of monitors in samza rest
authorShanthoosh Venkataraman <santhoshvenkat1988@gmail.com>
Wed, 19 Oct 2016 18:42:36 +0000 (11:42 -0700)
committerJacob Maes <jmaes@linkedin.com>
Wed, 19 Oct 2016 18:45:03 +0000 (11:45 -0700)
build.gradle
samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java
samza-rest/src/test/java/org/apache/samza/monitor/mock/MockMonitorFactory.java [new file with mode: 0644]

index 98839f2..ac0213e 100644 (file)
@@ -556,6 +556,7 @@ project(":samza-rest") {
 
     testCompile "junit:junit:$junitVersion"
     testCompile "org.glassfish.jersey.test-framework.providers:jersey-test-framework-provider-grizzly2:$jerseyVersion"
+    testCompile "org.mockito:mockito-all:$mockitoVersion"
   }
 
   tasks.create(name: "releaseRestServiceTar", type: Tar) {
index ce947f7..754ad82 100644 (file)
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.monitor;
 
+import com.google.common.base.Strings;
 import java.util.Map;
 import org.apache.samza.SamzaException;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -56,12 +57,19 @@ public class SamzaMonitorService {
         try {
             Map<String, MonitorConfig> monitorConfigs = getMonitorConfigs(config);
             for (Map.Entry<String, MonitorConfig> entry : monitorConfigs.entrySet()) {
+                String monitorName = entry.getKey();
                 MonitorConfig monitorConfig = entry.getValue();
-                int schedulingIntervalInMs = monitorConfig.getSchedulingIntervalInMs();
-                LOGGER.info("Scheduling monitor {} to run every {} ms", entry.getKey(), schedulingIntervalInMs);
-                // MetricsRegistry has been added in the Monitor interface, since it's required in the eventual future to record metrics.
-                // We have plans to record metrics, hence adding this as a placeholder. We just aren't doing it yet.
-                scheduler.schedule(getRunnable(instantiateMonitor(monitorConfig, metricsRegistry)), schedulingIntervalInMs);
+
+                if (!Strings.isNullOrEmpty(monitorConfig.getMonitorFactoryClass())) {
+                    int schedulingIntervalInMs = monitorConfig.getSchedulingIntervalInMs();
+                    LOGGER.info("Scheduling monitor {} to run every {} ms", monitorName, schedulingIntervalInMs);
+                    // MetricsRegistry has been added in the Monitor interface, since it's required in the eventual future to record metrics.
+                    // We have plans to record metrics, hence adding this as a placeholder. We just aren't doing it yet.
+                    scheduler.schedule(getRunnable(instantiateMonitor(monitorConfig, metricsRegistry)), schedulingIntervalInMs);
+                } else {
+                  // When MonitorFactoryClass is not defined in the config, ignore the monitor config
+                  LOGGER.warn("Not scheduling the monitor: {} to run, since monitor factory class is not set in config.", monitorName);
+                }
             }
         } catch (InstantiationException e) {
             LOGGER.error("Exception when instantiating the monitor : ", e);
index 4618b54..e75f494 100644 (file)
@@ -21,11 +21,13 @@ package org.apache.samza.monitor;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
+import org.apache.samza.SamzaException;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.monitor.mock.DummyMonitorFactory;
 import org.apache.samza.monitor.mock.ExceptionThrowingMonitorFactory;
 import org.apache.samza.monitor.mock.InstantSchedulingProvider;
+import org.apache.samza.monitor.mock.MockMonitorFactory;
 import org.apache.samza.rest.SamzaRestConfig;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.Test;
@@ -34,6 +36,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.mockito.Mockito;
 
 import static junit.framework.TestCase.assertTrue;
 import static org.apache.samza.monitor.MonitorConfig.CONFIG_MONITOR_FACTORY_CLASS;
@@ -102,6 +105,41 @@ public class TestMonitorService {
     }
 
     @Test
+    public void testShouldNotFailWhenTheMonitorFactoryClassIsNotDefined()
+        throws Exception {
+        // Test that when MonitorFactoryClass is not defined in the config, monitor service
+        // should not fail.
+        Map<String, String> configMap = ImmutableMap.of("monitor.monitor1.config.key1", "configValue1",
+                                                        "monitor.monitor1.config.key2", "configValue2",
+                                                        String.format("monitor.MOCK_MONITOR.%s", CONFIG_MONITOR_FACTORY_CLASS),
+                                                        MockMonitorFactory.class.getCanonicalName());
+
+        SamzaRestConfig config = new SamzaRestConfig(new MapConfig(configMap));
+        SamzaMonitorService monitorService = new SamzaMonitorService(config,
+                                                                     METRICS_REGISTRY,
+                                                                     new InstantSchedulingProvider());
+        try {
+            monitorService.start();
+        } catch (Exception e) {
+            fail();
+        }
+        Mockito.verify(MockMonitorFactory.MOCK_MONITOR, Mockito.times(1)).monitor();
+    }
+
+    @Test(expected = SamzaException.class)
+    public void testShouldFailWhenTheMonitorFactoryClassIsInvalid() {
+        // Test that when MonitorFactoryClass is defined in the config and is invalid,
+        // monitor service should fail. Should throw back SamzaException.
+        Map<String, String> configMap = ImmutableMap.of(String.format("monitor.name.%s", CONFIG_MONITOR_FACTORY_CLASS),
+                                                        "RandomClassName");
+        SamzaRestConfig config = new SamzaRestConfig(new MapConfig(configMap));
+        SamzaMonitorService monitorService = new SamzaMonitorService(config,
+                                                                     METRICS_REGISTRY,
+                                                                     new InstantSchedulingProvider());
+        monitorService.start();
+    }
+
+    @Test
     public void testScheduledExecutorSchedulingProvider() {
         // Test that the monitor is scheduled by the ScheduledExecutorSchedulingProvider
         ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/mock/MockMonitorFactory.java b/samza-rest/src/test/java/org/apache/samza/monitor/mock/MockMonitorFactory.java
new file mode 100644 (file)
index 0000000..5b19001
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor.mock;
+
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.monitor.Monitor;
+import org.apache.samza.monitor.MonitorConfig;
+import org.apache.samza.monitor.MonitorFactory;
+import org.mockito.Mockito;
+
+
+public class MockMonitorFactory implements MonitorFactory {
+
+  public static final Monitor MOCK_MONITOR = Mockito.mock(Monitor.class);
+
+  @Override
+  public Monitor getMonitorInstance(MonitorConfig config, MetricsRegistry metricsRegistry)
+      throws Exception {
+    Mockito.reset(MOCK_MONITOR);
+    return MOCK_MONITOR;
+  }
+}