SAMZA-1304: Handling duplicate stream processor registration.
authorShanthoosh Venkataraman <svenkataraman@linkedin.com>
Thu, 13 Jul 2017 23:19:56 +0000 (16:19 -0700)
committernavina <navina@apache.org>
Thu, 13 Jul 2017 23:19:56 +0000 (16:19 -0700)
When a stream processor registers with same processorId as already existing
processor in processor group, it's registration should fail.

Author: Shanthoosh Venkataraman <svenkataraman@linkedin.com>

Reviewers: Navina Ramesh <navina@apache.org>, Jagadish V <jvenkatr@linkedin.com>

Closes #240 from shanthoosh/standalone_duplicate_processor_fix

samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java
samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java

index 3f4fd0b..a48a450 100644 (file)
 
 package org.apache.samza.zk;
 
+import java.util.Objects;
 import org.apache.samza.SamzaException;
 
-
+/**
+ * Represents processor data stored in zookeeper processors node.
+ */
 public class ProcessorData {
   private final String processorId;
   private final String host;
@@ -51,4 +54,17 @@ public class ProcessorData {
   public String getProcessorId() {
     return processorId;
   }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(processorId, host);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) return false;
+    if (getClass() != obj.getClass()) return false;
+    final ProcessorData other = (ProcessorData) obj;
+    return Objects.equals(processorId, other.processorId) && Objects.equals(host, other.host);
+  }
 }
index 94c3054..8ca26c8 100644 (file)
@@ -20,8 +20,10 @@ package org.apache.samza.zk;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
@@ -162,8 +164,14 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
 
   void doOnProcessorChange(List<String> processors) {
     // if list of processors is empty - it means we are called from 'onBecomeLeader'
-    // TODO: Handle empty currentProcessorIds or duplicate processorIds in the list
+    // TODO: Handle empty currentProcessorIds.
     List<String> currentProcessorIds = getActualProcessorIds(processors);
+    Set<String> uniqueProcessorIds = new HashSet<String>(currentProcessorIds);
+
+    if (currentProcessorIds.size() != uniqueProcessorIds.size()) {
+      LOG.info("Processors: {} has duplicates. Not generating job model.", currentProcessorIds);
+      return;
+    }
 
     // Generate the JobModel
     JobModel jobModel = generateNewJobModel(currentProcessorIds);
index ecf118b..7406cf5 100644 (file)
@@ -23,11 +23,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
+import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.samza.SamzaException;
 import org.apache.samza.job.model.JobModel;
@@ -83,10 +85,6 @@ public class ZkUtils {
     }
   }
 
-  public static ZkConnection createZkConnection(String zkConnectString, int sessionTimeoutMs) {
-    return new ZkConnection(zkConnectString, sessionTimeoutMs);
-  }
-
   ZkClient getZkClient() {
     return zkClient;
   }
@@ -105,17 +103,69 @@ public class ZkUtils {
    * @return String representing the absolute ephemeralPath of this client in the current session
    */
   public synchronized String registerProcessorAndGetId(final ProcessorData data) {
+    String processorId = data.getProcessorId();
     if (ephemeralPath == null) {
-      ephemeralPath =
-          zkClient.createEphemeralSequential(
-              keyBuilder.getProcessorsPath() + "/", data.toString());
-
-      LOG.info("newly generated path for " + data +  " is " +  ephemeralPath);
-      return ephemeralPath;
+      ephemeralPath = zkClient.createEphemeralSequential(keyBuilder.getProcessorsPath() + "/", data.toString());
+      LOG.info("Created ephemeral path: {} for processor: {} in zookeeper.", ephemeralPath, data);
+      ProcessorNode processorNode = new ProcessorNode(data, ephemeralPath);
+      // Determine if there are duplicate processors with this.processorId after registration.
+      if (!isValidRegisteredProcessor(processorNode)) {
+        LOG.info("Processor: {} is duplicate. Deleting zookeeper node at path: {}.", processorId, ephemeralPath);
+        zkClient.delete(ephemeralPath);
+        throw new SamzaException(String.format("Processor: %s is duplicate in the group. Registration failed.", processorId));
+      }
     } else {
-      LOG.info("existing path for " + data +  " is " +  ephemeralPath);
-      return ephemeralPath;
+      LOG.info("Ephemeral path: {} exists for processor: {} in zookeeper.", ephemeralPath, data);
     }
+    return ephemeralPath;
+  }
+
+  /**
+   * Determines the validity of processor registered with zookeeper.
+   *
+   * If there are multiple processors registered with same processorId,
+   * the processor with lexicographically smallest zookeeperPath is considered valid
+   * and all the remaining processors are invalid.
+   *
+   * Two processors will not have smallest zookeeperPath because of sequentialId guarantees
+   * of zookeeper for ephemeral nodes.
+   *
+   * @param processor to check for validity condition in processors group.
+   * @return true if the processor is valid. false otherwise.
+   */
+  private boolean isValidRegisteredProcessor(final ProcessorNode processor) {
+    String processorId = processor.getProcessorData().getProcessorId();
+    List<ProcessorNode> processorNodes = getAllProcessorNodes().stream()
+                                                               .filter(processorNode -> processorNode.processorData.getProcessorId().equals(processorId))
+                                                               .collect(Collectors.toList());
+    // Check for duplicate processor condition(if more than one processor exist for this processorId).
+    if (processorNodes.size() > 1) {
+      // There exists more than processor for provided `processorId`.
+      LOG.debug("Processor nodes in zookeeper: {} for processorId: {}.", processorNodes, processorId);
+      // Get all ephemeral processor paths
+      TreeSet<String> sortedProcessorPaths = processorNodes.stream()
+                                                           .map(ProcessorNode::getEphemeralPath)
+                                                           .collect(Collectors.toCollection(TreeSet::new));
+      // Check if smallest path is equal to this processor's ephemeralPath.
+      return sortedProcessorPaths.first().equals(processor.getEphemeralPath());
+    }
+    // There're no duplicate processors. This is a valid registered processor.
+    return true;
+  }
+
+  /**
+   * Fetches all the ephemeral processor nodes of a standalone job from zookeeper.
+   * @return a list of {@link ProcessorNode}, where each ProcessorNode represents a registered stream processor.
+   */
+  private List<ProcessorNode> getAllProcessorNodes() {
+    List<String> processorZNodes = getSortedActiveProcessorsZnodes();
+    LOG.debug("Active ProcessorZNodes in zookeeper: {}.", processorZNodes);
+    return processorZNodes.stream()
+                          .map(processorZNode -> {
+                              String ephemeralProcessorPath = String.format("%s/%s", keyBuilder.getProcessorsPath(), processorZNode);
+                              String data = readProcessorData(ephemeralProcessorPath);
+                              return new ProcessorNode(new ProcessorData(data), ephemeralProcessorPath);
+                            }).collect(Collectors.toList());
   }
 
   /**
@@ -321,4 +371,45 @@ public class ZkUtils {
     zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener);
     metrics.subscriptions.inc();
   }
+
+  /**
+   * Represents zookeeper processor node.
+   */
+  private static class ProcessorNode {
+    private final ProcessorData processorData;
+
+    // Ex: /test/processors/0000000000
+    private final String ephemeralProcessorPath;
+
+    ProcessorNode(ProcessorData processorData, String ephemeralProcessorPath) {
+      this.processorData = processorData;
+      this.ephemeralProcessorPath = ephemeralProcessorPath;
+    }
+
+    ProcessorData getProcessorData() {
+      return processorData;
+    }
+
+    String getEphemeralPath() {
+      return ephemeralProcessorPath;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("[ProcessorData: %s, ephemeralProcessorPath: %s]", processorData, ephemeralProcessorPath);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(processorData, ephemeralProcessorPath);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null) return false;
+      if (getClass() != obj.getClass()) return false;
+      final ProcessorNode other = (ProcessorNode) obj;
+      return Objects.equals(processorData, other.processorData) && Objects.equals(ephemeralProcessorPath, other.ephemeralProcessorPath);
+    }
+  }
 }
index a33bf03..e7a9aa2 100644 (file)
@@ -37,6 +37,8 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
 import org.junit.Test;
 
 public class TestZkUtils {
@@ -47,6 +49,10 @@ public class TestZkUtils {
   private static final int CONNECTION_TIMEOUT_MS = 10000;
   private ZkUtils zkUtils;
 
+  @Rule
+  // Declared public to honor junit contract.
+  public final ExpectedException expectedException = ExpectedException.none();
+
   @BeforeClass
   public static void setup() throws InterruptedException {
     zkServer = new EmbeddedZookeeper();
@@ -68,10 +74,7 @@ public class TestZkUtils {
       // Do nothing
     }
 
-    zkUtils = new ZkUtils(
-        KEY_BUILDER,
-        zkClient,
-        SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
+    zkUtils = getZkUtils();
 
     zkUtils.connect();
   }
@@ -82,6 +85,11 @@ public class TestZkUtils {
     zkClient.close();
   }
 
+  private ZkUtils getZkUtils() {
+    return new ZkUtils(KEY_BUILDER, zkClient,
+                       SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
+  }
+
   @AfterClass
   public static void teardown() {
     zkServer.teardown();
@@ -174,6 +182,26 @@ public class TestZkUtils {
     Assert.assertTrue(testWithDelayBackOff(() -> "newProcessor".equals(res.getRes()), 2, 1000));
   }
 
+  /**
+   * Create two duplicate processors with same processorId.
+   * Second creation should fail with exception.
+   */
+  @Test
+  public void testRegisterProcessorAndGetIdShouldFailForDuplicateProcessorRegistration() {
+    final String testHostName = "localhost";
+    final String testProcessId = "testProcessorId";
+    ProcessorData processorData1 = new ProcessorData(testHostName, testProcessId);
+    // Register processor 1 which is not duplicate, this registration should succeed.
+    zkUtils.registerProcessorAndGetId(processorData1);
+
+    ZkUtils zkUtils1 = getZkUtils();
+    zkUtils1.connect();
+    ProcessorData duplicateProcessorData = new ProcessorData(testHostName, testProcessId);
+    // Registration of the duplicate processor should fail.
+    expectedException.expect(SamzaException.class);
+    zkUtils1.registerProcessorAndGetId(duplicateProcessorData);
+  }
+
   @Test
   public void testPublishNewJobModel() {
     ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
index 2d5da2b..77e2a49 100644 (file)
@@ -58,6 +58,7 @@ import org.apache.samza.zk.ZkKeyBuilder;
 import org.apache.samza.zk.ZkUtils;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -98,6 +99,9 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
   @Rule
   public Timeout testTimeOutInMillis = new Timeout(90000);
 
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
   @Override
   public void setUp() {
     super.setUp();
@@ -224,9 +228,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     assertEquals("2", currentJobModelVersion);
   }
 
-  // Checks enforcing property that all processors should have unique Id.
-  // Depends upon SAMZA-1302
-  // @Test(expected = Exception.class)
+  @Test
   public void shouldFailWhenNewProcessorJoinsWithSameIdAsExistingProcessor() throws InterruptedException {
     // Set up kafka topics.
     publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
@@ -253,10 +255,9 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[2]);
     kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
     StreamApplication streamApp3 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null, null, kafkaEventsConsumedLatch);
+    // Fail when the duplicate processor joins.
+    expectedException.expect(SamzaException.class);
     applicationRunner3.run(streamApp3);
-
-    // The following line should throw up by handling duplicate processorId registration.
-    kafkaEventsConsumedLatch.await();
   }
 
   // Depends upon SAMZA-1302