SAMZA-1324: Fix NullPointerException in ZkUtils api's.
authorShanthoosh Venkataraman <svenkataraman@linkedin.com>
Wed, 12 Jul 2017 18:13:09 +0000 (11:13 -0700)
committernavina <navina@apache.org>
Wed, 12 Jul 2017 18:13:09 +0000 (11:13 -0700)
Problem:
Read/Write api methods in ZkUtils updates counters/timers in `metrics` field. In a ZkUtils constructor this fields is not initialized properly. Java default for uninitialized field is null resulting in NPE.

Fix:
Initialize private fields of ZkUtils class with appropriate defaults.

Author: Shanthoosh Venkataraman <svenkataraman@linkedin.com>

Reviewers: Navina Ramesh <navina@apache.org>

Closes #235 from shanthoosh/fix_zkutils_api

docs/learn/documentation/versioned/container/metrics-table.html
samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java
samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java

index 7fbbc40..e504fa3 100644 (file)
     <li><a href="#bootstrapping-chooser-metrics">BootstrappingChooserMetrics</a></li>
     <li><a href="#hdfs-system-producer-metrics">HdfsSystemProducerMetrics</a></li>
     <li><a href="#elasticsearch-system-producer-metrics">ElasticsearchSystemProducerMetrics</a></li>
+    <li><a href="#zookeeper-client-metrics">ZookeeperClientMetrics</a></li>
     <li><a href="#zookeeper-job-coordinator-metrics">ZkJobCoordinatorMetrics</a></li>
 </ul>
 <p>Words highlighted like <span class="system">this</span> are placeholders for your own variable names defined in configuration file or system variables defined while starting the job.</p>
     </tr>
 
     <tr>
-        <th colspan="2" class="section" id="zookeeper-job-coordinator-metrics">org.apache.samza.zk.ZkJobCoordinatorMetrics</th>
+        <th colspan="2" class="section" id="zookeeper-client-metrics">org.apache.samza.zk.ZkUtilsMetrics</th>
     </tr>
     <tr>
         <td>reads</td>
         <td>Number of subscriptions to znodes in Zookeeper</td>
     </tr>
     <tr>
-        <td>zk-connection-error</td>
+        <td>zk-connection-errors</td>
         <td>Number of Zookeeper connection errors</td>
     </tr>
     <tr>
+        <th colspan="2" class="section" id="zookeeper-job-coordinator-metrics">org.apache.samza.zk.ZkJobCoordinatorMetrics</th>
+    </tr>
+    <tr>
         <td>is-leader</td>
         <td>Denotes if the processor is a leader or not</td>
     </tr>
index 20fcfa4..d0633a8 100644 (file)
@@ -25,13 +25,14 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.CoordinationServiceFactory;
 import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.util.NoOpMetricsRegistry;
 import org.apache.zookeeper.client.ConnectStringParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 public class ZkCoordinationServiceFactory implements CoordinationServiceFactory {
-  private final static Logger LOG = LoggerFactory.getLogger(ZkCoordinationServiceFactory.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinationServiceFactory.class);
 
   public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {
     ZkConfig zkConfig = new ZkConfig(config);
@@ -39,7 +40,7 @@ public class ZkCoordinationServiceFactory implements CoordinationServiceFactory
     ZkClient zkClient =
         createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
 
-    ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
+    ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs(), new NoOpMetricsRegistry());
 
     return new ZkCoordinationUtils(participantId, zkConfig, zkUtils);
   }
index f2fc3de..94c3054 100644 (file)
@@ -80,7 +80,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
             zkConfig.getZkConnect(),
             zkConfig.getZkSessionTimeoutMs(),
             zkConfig.getZkConnectionTimeoutMs()),
-        zkConfig.getZkConnectionTimeoutMs(), metrics);
+        zkConfig.getZkConnectionTimeoutMs(), metricsRegistry);
 
     this.processorId = createProcessorId(config);
     LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils);
index 3437602..3d00897 100644 (file)
@@ -31,11 +31,6 @@ public class ZkJobCoordinatorMetrics extends MetricsBase {
 
   private final MetricsRegistry metricsRegistry;
 
-  public final Counter reads;
-  public final Counter writes;
-  public final Counter subscriptions;
-  public final Counter zkConnectionError;
-
   /**
    * Denotes if the processor is a leader or not
    */
@@ -65,10 +60,6 @@ public class ZkJobCoordinatorMetrics extends MetricsBase {
   public ZkJobCoordinatorMetrics(MetricsRegistry metricsRegistry) {
     super(metricsRegistry);
     this.metricsRegistry = metricsRegistry;
-    this.reads = newCounter("reads");
-    this.writes = newCounter("writes");
-    this.subscriptions = newCounter("subscriptions");
-    this.zkConnectionError = newCounter("zk-connection-error");
     this.isLeader = newGauge("is-leader", false);
     this.barrierCreation = newCounter("barrier-creation");
     this.barrierStateChange = newCounter("barrier-state-change");
index aa55ff7..ecf118b 100644 (file)
@@ -31,6 +31,7 @@ import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.samza.SamzaException;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.zookeeper.data.Stat;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -65,19 +66,13 @@ public class ZkUtils {
   private volatile String ephemeralPath = null;
   private final ZkKeyBuilder keyBuilder;
   private final int connectionTimeoutMs;
-  private ZkJobCoordinatorMetrics metrics;
+  private final ZkUtilsMetrics metrics;
 
-  public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) {
+  public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs, MetricsRegistry metricsRegistry) {
     this.keyBuilder = zkKeyBuilder;
     this.connectionTimeoutMs = connectionTimeoutMs;
     this.zkClient = zkClient;
-  }
-
-  public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs, ZkJobCoordinatorMetrics metrics) {
-    this.keyBuilder = zkKeyBuilder;
-    this.connectionTimeoutMs = connectionTimeoutMs;
-    this.zkClient = zkClient;
-    this.metrics = metrics;
+    this.metrics = new ZkUtilsMetrics(metricsRegistry);
   }
 
   public void connect() throws ZkInterruptedException {
@@ -269,7 +264,9 @@ public class ZkUtils {
    * @return jobmodel version as a string
    */
   public String getJobModelVersion() {
-    return zkClient.<String>readData(keyBuilder.getJobModelVersionPath());
+    String jobModelVersion = zkClient.readData(keyBuilder.getJobModelVersionPath());
+    metrics.reads.inc();
+    return jobModelVersion;
   }
 
   /**
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java
new file mode 100644 (file)
index 0000000..b9f4aa8
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsBase;
+import org.apache.samza.metrics.MetricsRegistry;
+
+/**
+ * Contains all the metrics published by {@link ZkUtils}.
+ */
+public class ZkUtilsMetrics extends MetricsBase {
+  /**
+   * Number of data reads from zookeeper.
+   */
+  public final Counter reads;
+
+  /**
+   * Number of data writes into zookeeper.
+   */
+  public final Counter writes;
+
+  /**
+   * Number of subscriptions created with zookeeper.
+   */
+  public final Counter subscriptions;
+
+  /**
+   * Number of zookeeper connection errors in ZkClient.
+   */
+  public final Counter zkConnectionError;
+
+  public ZkUtilsMetrics(MetricsRegistry metricsRegistry) {
+    super(metricsRegistry);
+    this.reads = newCounter("reads");
+    this.writes = newCounter("writes");
+    this.subscriptions = newCounter("subscriptions");
+    this.zkConnectionError = newCounter("zk-connection-errors");
+  }
+}
index 49cd280..3dd1bd5 100644 (file)
@@ -53,9 +53,9 @@ public class TestZkBarrierForVersionUpgrade {
   @Before
   public void testSetup() {
     ZkClient zkClient = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
-    this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry()));
+    this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry());
     ZkClient zkClient1 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
-    this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry()));
+    this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry());
   }
 
   @After
index 993297b..3ff9175 100644 (file)
@@ -437,6 +437,6 @@ public class TestZkLeaderElector {
     return new ZkUtils(
         KEY_BUILDER,
         zkClient,
-        CONNECTION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry()));
+        CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry());
   }
 }
index 9f089a0..b2a5533 100644 (file)
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.coordinator.Latch;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -219,6 +220,7 @@ public class TestZkProcessorLatch {
     return new ZkUtils(
         KEY_BUILDER,
         zkClient,
-        CONNECTION_TIMEOUT_MS);
+        CONNECTION_TIMEOUT_MS,
+        new NoOpMetricsRegistry());
   }
 }
index 9e33484..a33bf03 100644 (file)
@@ -71,7 +71,7 @@ public class TestZkUtils {
     zkUtils = new ZkUtils(
         KEY_BUILDER,
         zkClient,
-        SESSION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry()));
+        SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
 
     zkUtils.connect();
   }
@@ -110,7 +110,7 @@ public class TestZkUtils {
     zkUtils.registerProcessorAndGetId(new ProcessorData("host1", "1"));
     List<String> l = zkUtils.getSortedActiveProcessorsIDs();
     Assert.assertEquals(1, l.size());
-    new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS).registerProcessorAndGetId(new ProcessorData("host2", "2"));
+    new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()).registerProcessorAndGetId(new ProcessorData("host2", "2"));
     l = zkUtils.getSortedActiveProcessorsIDs();
     Assert.assertEquals(2, l.size());
 
index 4865647..2d5da2b 100644 (file)
@@ -54,7 +54,6 @@ import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.test.StandaloneIntegrationTestHarness;
 import org.apache.samza.test.StandaloneTestUtils;
 import org.apache.samza.util.NoOpMetricsRegistry;
-import org.apache.samza.zk.ZkJobCoordinatorMetrics;
 import org.apache.samza.zk.ZkKeyBuilder;
 import org.apache.samza.zk.ZkUtils;
 import org.junit.Rule;
@@ -94,7 +93,6 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
   private LocalApplicationRunner applicationRunner1;
   private LocalApplicationRunner applicationRunner2;
   private LocalApplicationRunner applicationRunner3;
-  private ZkJobCoordinatorMetrics zkJobCoordinatorMetrics;
 
   // Set 90 seconds as max execution time for each test.
   @Rule
@@ -110,8 +108,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     outputKafkaTopic = String.format("test-output-topic-%s", uniqueTestId);
     ZkClient zkClient = new ZkClient(zkConnect());
     ZkKeyBuilder zkKeyBuilder = new ZkKeyBuilder(String.format("app-%s-%s", testStreamAppName, testStreamAppId));
-    zkJobCoordinatorMetrics = new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry());
-    zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, zkJobCoordinatorMetrics);
+    zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry());
     zkUtils.connect();
 
     // Set up stream application configs with different processorIds and same testStreamAppName, testStreamAppId.