SAMZA-1165; cleanup old zk versions.
authorBoris Shkolnik <boryas@apache.org>
Fri, 21 Jul 2017 20:43:18 +0000 (13:43 -0700)
committerJagadish <jagadish@apache.org>
Fri, 21 Jul 2017 20:43:18 +0000 (13:43 -0700)
Author: Boris Shkolnik <boryas@apache.org>
Author: Boris Shkolnik <bshkolni@linkedin.com>

Reviewers: Navina <navina@apache.org>, Shanthoosh V<svenkata@linkedin.com>

Closes #239 from sborya/zkCleanUpBarrier1

samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java

index fc9f165..0bf078e 100644 (file)
@@ -36,7 +36,7 @@ import scala.collection.JavaConverters;
 
 public class TaskConfigJava extends MapConfig {
   // Task Configs
-  private static final String TASK_SHUTDOWN_MS = "task.shutdown.ms";
+  public static final String TASK_SHUTDOWN_MS = "task.shutdown.ms";
   public static final long DEFAULT_TASK_SHUTDOWN_MS = 30000L;
 
   // broadcast streams consumed by all tasks. e.g. kafka.foo#1
index 9b8ea66..6174063 100644 (file)
@@ -48,6 +48,14 @@ public class ScheduleAfterDebounceTime {
   // Action name when the Processor membership changes
   public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
 
+  /**
+   *
+   * cleanup process is started after every new job model generation is complete.
+   * It deletes old versions of job model and the barrier.
+   * How many to delete (or to leave) is controlled by @see org.apache.samza.zk.ZkJobCoordinator#NUM_VERSIONS_TO_LEAVE.
+   **/
+  public static final String ON_ZK_CLEANUP = "OnCleanUp";
+
   private final ScheduledTaskFailureCallback scheduledTaskFailureCallback;
 
   private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
index 196e431..3257ee1 100644 (file)
@@ -117,9 +117,7 @@ public class ZkBarrierForVersionUpgrade {
    * @param version Version associated with the Barrier
    */
   public void expire(String version) {
-    zkUtils.writeData(
-        keyBuilder.getBarrierStatePath(version),
-        State.TIMED_OUT);
+    zkUtils.writeData(keyBuilder.getBarrierStatePath(version), State.TIMED_OUT);
 
   }
   /**
@@ -222,4 +220,7 @@ public class ZkBarrierForVersionUpgrade {
     }
   }
 
+  public static int getVersion(String barrierPath) {
+    return Integer.valueOf(barrierPath.substring(barrierPath.lastIndexOf('_') + 1));
+  }
 }
index 298c96e..dd08e3f 100644 (file)
@@ -60,6 +60,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   // TODO: MetadataCache timeout has to be 0 for the leader so that it can always have the latest information associated
   // with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197
   private static final int METADATA_CACHE_TTL_MS = 5000;
+  private static final int NUM_VERSIONS_TO_LEAVE = 10;
 
   private final ZkUtils zkUtils;
   private final String processorId;
@@ -202,6 +203,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
 
     LOG.info("pid=" + processorId + "Published new Job Model. Version = " + nextJMVersion);
+
+    debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_ZK_CLEANUP, 0, () -> zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE));
   }
 
   @Override
index 8b6bc52..5df7114 100644 (file)
@@ -22,6 +22,7 @@ package org.apache.samza.zk;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
 import java.util.TreeSet;
@@ -32,6 +33,7 @@ import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -459,6 +461,66 @@ public class ZkUtils {
   }
 
   /**
+   * cleanup old data from ZK
+   * @param numVersionsToLeave - number of versions to leave
+   */
+  public void cleanupZK(int numVersionsToLeave) {
+    deleteOldBarrierVersions(numVersionsToLeave);
+    deleteOldJobModels(numVersionsToLeave);
+  }
+
+  void deleteOldJobModels(int numVersionsToLeave) {
+    // read current list of JMs
+    String path = keyBuilder.getJobModelPathPrefix();
+    LOG.info("about to delete jm path=" + path);
+    List<String> znodeIds = zkClient.getChildren(path);
+    deleteOldVersionPath(path, znodeIds, numVersionsToLeave, new Comparator<String>() {
+      @Override
+      public int compare(String o1, String o2) {
+        // jm version name format is <num>
+        return Integer.valueOf(o1) - Integer.valueOf(o2);
+      }
+    });
+  }
+
+  void deleteOldBarrierVersions(int numVersionsToLeave) {
+    // read current list of barriers
+    String path = keyBuilder.getJobModelVersionBarrierPrefix();
+    LOG.info("about to delete old barrier paths from " + path);
+    List<String> znodeIds = zkClient.getChildren(path);
+    LOG.info("List of all zkNodes: " + znodeIds);
+    deleteOldVersionPath(path, znodeIds, numVersionsToLeave,  new Comparator<String>() {
+      @Override
+      public int compare(String o1, String o2) {
+        // barrier's name format is barrier_<num>
+        return ZkBarrierForVersionUpgrade.getVersion(o1) - ZkBarrierForVersionUpgrade.getVersion(o2);
+      }
+    });
+  }
+
+  void deleteOldVersionPath(String path, List<String> zNodeIds, int numVersionsToLeave, Comparator<String> c) {
+    if (StringUtils.isEmpty(path) || zNodeIds == null) {
+      LOG.warn("cannot cleanup empty path or empty list in ZK");
+      return;
+    }
+    if (zNodeIds.size() > numVersionsToLeave) {
+      Collections.sort(zNodeIds, c);
+      // get the znodes to delete
+      int size = zNodeIds.size();
+      List<String> zNodesToDelete = zNodeIds.subList(0, zNodeIds.size() - numVersionsToLeave);
+      LOG.info("Starting cleanup of barrier version zkNodes. From size=" + size + " to size " + zNodesToDelete.size() + "; numberToLeave=" + numVersionsToLeave);
+      for (String znodeId : zNodesToDelete) {
+        String pathToDelete = path + "/" + znodeId;
+        try {
+          LOG.info("deleting " + pathToDelete);
+          zkClient.deleteRecursive(pathToDelete);
+        } catch (Exception e) {
+          LOG.warn("delete of node " + pathToDelete + " failed.", e);
+        }
+      }
+    }
+  }
+  /**
    * Represents zookeeper processor node.
    */
   private static class ProcessorNode {
index e7a9aa2..b5953d1 100644 (file)
  */
 package org.apache.samza.zk;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,8 +42,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
-import org.junit.rules.ExpectedException;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 public class TestZkUtils {
   private static EmbeddedZookeeper zkServer = null;
@@ -102,7 +106,6 @@ public class TestZkUtils {
 
     // Calling registerProcessorId again should return the same ephemeralPath as long as the session is valid
     Assert.assertTrue(zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1")).equals(assignedPath));
-
   }
 
   @Test
@@ -237,6 +240,111 @@ public class TestZkUtils {
     Assert.assertEquals(jobModel, zkUtils.getJobModel(version));
   }
 
+  @Test
+  public void testCleanUpZkJobModels() {
+    String root = zkUtils.getKeyBuilder().getJobModelPathPrefix();
+    System.out.println("root=" + root);
+    zkUtils.getZkClient().createPersistent(root, true);
+
+    // generate multiple version
+    for (int i = 101; i < 110; i++) {
+      zkUtils.publishJobModel(String.valueOf(i), null);
+    }
+
+    // clean all of the versions except 5 most recent ones
+    zkUtils.deleteOldJobModels(5);
+    Assert.assertEquals(Arrays.asList("105", "106", "107", "108", "109"), zkUtils.getZkClient().getChildren(root));
+  }
+
+  @Test
+  public void testCleanUpZkBarrierVersion() {
+    String root = zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix();
+    zkUtils.getZkClient().createPersistent(root, true);
+    ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(root, zkUtils, null);
+    for (int i = 200; i < 210; i++) {
+      barrier.create(String.valueOf(i), new ArrayList<>(Arrays.asList(i + "a", i + "b", i + "c")));
+    }
+
+    zkUtils.deleteOldBarrierVersions(5);
+    List<String> zNodeIds = zkUtils.getZkClient().getChildren(root);
+    Collections.sort(zNodeIds);
+    Assert.assertEquals(Arrays.asList("barrier_205", "barrier_206", "barrier_207", "barrier_208", "barrier_209"),
+        zNodeIds);
+  }
+
+  @Test
+  public void testCleanUpZk() {
+    String pathA = "/path/testA";
+    String pathB = "/path/testB";
+    zkUtils.getZkClient().createPersistent(pathA, true);
+    zkUtils.getZkClient().createPersistent(pathB, true);
+
+    // Create 100 nodes
+    for (int i = 0; i < 20; i++) {
+      String p1 = pathA + "/" + i;
+      zkUtils.getZkClient().createPersistent(p1, true);
+      zkUtils.getZkClient().createPersistent(p1 + "/something1", true);
+      zkUtils.getZkClient().createPersistent(p1 + "/something2", true);
+
+      String p2 = pathB + "/some_" + i;
+      zkUtils.getZkClient().createPersistent(p2, true);
+      zkUtils.getZkClient().createPersistent(p2 + "/something1", true);
+      zkUtils.getZkClient().createPersistent(p2 + "/something2", true);
+    }
+
+    List<String> zNodeIds = new ArrayList<>();
+    // empty list
+    zkUtils.deleteOldVersionPath(pathA, zNodeIds, 10, new Comparator<String>() {
+      @Override
+      public int compare(String o1, String o2) {
+        return o1.compareTo(o2);
+      }
+    });
+
+
+    zNodeIds = zkUtils.getZkClient().getChildren(pathA);
+    zkUtils.deleteOldVersionPath(pathA, zNodeIds, 10, new Comparator<String>() {
+      @Override
+      public int compare(String o1, String o2) {
+        return Integer.valueOf(o1) - Integer.valueOf(o2);
+      }
+    });
+
+    for (int i = 0; i < 10; i++) {
+      // should be gone
+      String p1 = pathA + "/" + i;
+      Assert.assertFalse("path " + p1 + " exists", zkUtils.getZkClient().exists(p1));
+    }
+
+    for (int i = 10; i < 20; i++) {
+      // should be gone
+      String p1 = pathA + "/" + i;
+      Assert.assertTrue("path " + p1 + " exists", zkUtils.getZkClient().exists(p1));
+    }
+
+    zNodeIds = zkUtils.getZkClient().getChildren(pathB);
+    zkUtils.deleteOldVersionPath(pathB, zNodeIds, 1, new Comparator<String>() {
+      @Override
+      public int compare(String o1, String o2) {
+        return Integer.valueOf(o1.substring(o1.lastIndexOf("_") + 1)) - Integer
+            .valueOf(o2.substring(o2.lastIndexOf("_") + 1));
+      }
+    });
+
+    for (int i = 0; i < 19; i++) {
+      // should be gone
+      String p1 = pathB + "/" + i;
+      Assert.assertFalse("path " + p1 + " exists", zkUtils.getZkClient().exists(p1));
+    }
+
+    for (int i = 19; i < 20; i++) {
+      // should be gone
+      String p1 = pathB + "/some_" + i;
+      Assert.assertTrue("path " + p1 + " exists", zkUtils.getZkClient().exists(p1));
+    }
+
+  }
+
   public static boolean testWithDelayBackOff(BooleanSupplier cond, long startDelayMs, long maxDelayMs) {
     long delay = startDelayMs;
     while (delay < maxDelayMs) {
index 1a13825..7253b29 100644 (file)
@@ -20,7 +20,6 @@
 package org.apache.samza.processor;
 
 import java.util.concurrent.CountDownLatch;
-import org.apache.samza.zk.TestZkUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -146,8 +145,8 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
     LOG.info("containerStopped latch = " + containerStopped1);
     waitForProcessorToStartStop(containerStopped1);
 
-    // let the system to publish and distribute the new job model
-    TestZkUtils.sleepMs(600);
+    // read again the first batch
+    waitUntilMessagesLeftN(totalEventsToGenerate - 2 * messageCount);
 
     // produce the second batch of the messages, starting with 'messageCount'
     produceMessages(messageCount, inputTopic, messageCount);
@@ -226,8 +225,8 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
     LOG.info("containerStopped latch = " + containerStopped2);
     waitForProcessorToStartStop(containerStopped2);
 
-    // let the system to publish and distribute the new job model
-    TestZkUtils.sleepMs(300);
+    // read again the first batch
+    waitUntilMessagesLeftN(totalEventsToGenerate - 2 * messageCount);
 
     // produce the second batch of the messages, starting with 'messageCount'
     produceMessages(messageCount, inputTopic, messageCount);
index 4cbe252..f2f1585 100644 (file)
@@ -287,7 +287,7 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness
         System.out.println("2read all. current count = " + leftEventsCount);
         break;
       }
-      TestZkUtils.sleepMs(3000);
+      TestZkUtils.sleepMs(5000);
       attempts--;
     }
     Assert.assertTrue("Didn't read all the leftover events in " + ATTEMPTS_NUMBER + " attempts", attempts > 0);
index 10b08d9..6aab5e3 100644 (file)
@@ -112,6 +112,9 @@ public class TestZkStreamProcessorSession extends TestZkStreamProcessorBase {
       waitForProcessorToStartStop(containerStopLatches[i]);
     }
 
+    // read again the first batch
+    waitUntilMessagesLeftN(totalEventsToGenerate - 2 * messageCount);
+
     produceMessages(messageCount, inputTopic, messageCount);
 
     waitUntilMessagesLeftN(0);
index 77e2a49..ebbe07b 100644 (file)
@@ -44,6 +44,7 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
+import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.model.JobModel;
@@ -83,6 +84,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
   private static final String TEST_JOB_COORDINATOR_FACTORY = "org.apache.samza.zk.ZkJobCoordinatorFactory";
   private static final String TEST_SYSTEM_FACTORY = "org.apache.samza.system.kafka.KafkaSystemFactory";
   private static final String TEST_JOB_NAME = "test-job";
+  private static final String TASK_SHUTDOWN_MS = "5000";
   private static final String[] PROCESSOR_IDS = new String[] {"0000000000", "0000000001", "0000000002"};
 
   private String inputKafkaTopic;
@@ -169,6 +171,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
         .put(ApplicationConfig.APP_ID, appId)
         .put(String.format("systems.%s.samza.factory", systemName), TEST_SYSTEM_FACTORY)
         .put(JobConfig.JOB_NAME(), TEST_JOB_NAME)
+        .put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS)
         .build();
     Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig);
     applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(systemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));