CURATOR-498
authorrandgalt <randgalt@apache.org>
Mon, 4 Feb 2019 15:44:24 +0000 (10:44 -0500)
committerrandgalt <randgalt@apache.org>
Mon, 4 Feb 2019 15:44:24 +0000 (10:44 -0500)
Removed no-longer-necessary KillSession2. Also, now always use the reflection based code to insert the session end event given what we've found about the previous version inside of ZooKeeper. I also opened a PR in ZooKeeper (see ZOOKEEPER-3269) to add a supported method to do this for the future.

22 files changed:
curator-client/src/main/java/org/apache/curator/utils/Compatibility.java
curator-client/src/main/java/org/apache/curator/utils/InjectSessionExpiration.java
curator-client/src/test/java/org/apache/curator/BasicTests.java
curator-client/src/test/java/org/apache/curator/TestSessionFailRetryLoop.java
curator-framework/src/main/java/org/apache/curator/framework/imps/ProtectedMode.java
curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java
curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
curator-test-zk34/src/test/java/org/apache/curator/test/Compatibility.java
curator-test/src/main/java/org/apache/curator/test/Compatibility.java
curator-test/src/main/java/org/apache/curator/test/compatibility/KillSession2.java [deleted file]
curator-x-discovery/src/test/java/org/apache/curator/x/discovery/details/TestServiceDiscovery.java

index 58b62a7..1ee2301 100644 (file)
  */
 package org.apache.curator.utils;
 
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import java.lang.reflect.Method;
 
 /**
  * Utils to help with ZK 3.4.x compatibility
@@ -27,20 +31,36 @@ import org.slf4j.LoggerFactory;
 public class Compatibility
 {
     private static final boolean hasZooKeeperAdmin;
+    private static final Method queueEventMethod;
+    private static final Logger logger = LoggerFactory.getLogger(Compatibility.class);
+
     static
     {
-        boolean hasIt;
+        boolean localHasZooKeeperAdmin;
         try
         {
             Class.forName("org.apache.zookeeper.admin.ZooKeeperAdmin");
-            hasIt = true;
+            localHasZooKeeperAdmin = true;
         }
         catch ( ClassNotFoundException e )
         {
-            hasIt = false;
-            LoggerFactory.getLogger(Compatibility.class).info("Running in ZooKeeper 3.4.x compatibility mode");
+            localHasZooKeeperAdmin = false;
+            logger.info("Running in ZooKeeper 3.4.x compatibility mode");
+        }
+        hasZooKeeperAdmin = localHasZooKeeperAdmin;
+
+        Method localQueueEventMethod;
+        try
+        {
+            Class<?> testableClass = Class.forName("org.apache.zookeeper.Testable");
+            localQueueEventMethod = testableClass.getMethod("queueEvent", WatchedEvent.class);
+        }
+        catch ( ReflectiveOperationException ignore )
+        {
+            localQueueEventMethod = null;
+            LoggerFactory.getLogger(Compatibility.class).info("Using emulated InjectSessionExpiration");
         }
-        hasZooKeeperAdmin = hasIt;
+        queueEventMethod = localQueueEventMethod;
     }
 
     /**
@@ -61,16 +81,21 @@ public class Compatibility
      */
     public static void injectSessionExpiration(ZooKeeper zooKeeper)
     {
-        if ( isZK34() )
+        if ( isZK34() || (queueEventMethod == null) )
         {
             InjectSessionExpiration.injectSessionExpiration(zooKeeper);
         }
         else
         {
-            // LOL - this method was proposed by me (JZ) in 2013 for totally unrelated reasons
-            // it got added to ZK 3.5 and now does exactly what we need
-            // https://issues.apache.org/jira/browse/ZOOKEEPER-1730
-            zooKeeper.getTestable().injectSessionExpiration();
+            try
+            {
+                WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null);
+                queueEventMethod.invoke(zooKeeper.getTestable(), event);
+            }
+            catch ( Exception e )
+            {
+                logger.error("Could not call Testable.queueEvent()", e);
+            }
         }
     }
 }
index caf9cbf..8ad2b5d 100644 (file)
@@ -31,40 +31,22 @@ public class InjectSessionExpiration
 {
     private static final Field cnxnField;
     private static final Field eventThreadField;
-    private static final Field sendThreadField;
     private static final Method queueEventMethod;
-    private static final Method queueEventOfDeathMethod;
-    private static final Method getClientCnxnSocketMethod;
-    private static final Method wakeupCnxnMethod;
     static
     {
         Field localCnxnField;
         Field localEventThreadField;
-        Field localSendThreadField;
         Method localQueueEventMethod;
-        Method localEventOfDeathMethod;
-        Method localGetClientCnxnSocketMethod;
-        Method localWakeupCnxnMethod;
         try
         {
             Class<?> eventThreadClass = Class.forName("org.apache.zookeeper.ClientCnxn$EventThread");
-            Class<?> sendThreadClass = Class.forName("org.apache.zookeeper.ClientCnxn$SendThread");
-            Class<?> clientCnxnSocketClass = Class.forName("org.apache.zookeeper.ClientCnxnSocket");
 
             localCnxnField = ZooKeeper.class.getDeclaredField("cnxn");
             localCnxnField.setAccessible(true);
             localEventThreadField = ClientCnxn.class.getDeclaredField("eventThread");
             localEventThreadField.setAccessible(true);
-            localSendThreadField = ClientCnxn.class.getDeclaredField("sendThread");
-            localSendThreadField.setAccessible(true);
             localQueueEventMethod = eventThreadClass.getDeclaredMethod("queueEvent", WatchedEvent.class);
             localQueueEventMethod.setAccessible(true);
-            localEventOfDeathMethod = eventThreadClass.getDeclaredMethod("queueEventOfDeath");
-            localEventOfDeathMethod.setAccessible(true);
-            localGetClientCnxnSocketMethod = sendThreadClass.getDeclaredMethod("getClientCnxnSocket");
-            localGetClientCnxnSocketMethod.setAccessible(true);
-            localWakeupCnxnMethod = clientCnxnSocketClass.getDeclaredMethod("wakeupCnxn");
-            localWakeupCnxnMethod.setAccessible(true);
         }
         catch ( ReflectiveOperationException e )
         {
@@ -72,11 +54,7 @@ public class InjectSessionExpiration
         }
         cnxnField = localCnxnField;
         eventThreadField = localEventThreadField;
-        sendThreadField = localSendThreadField;
         queueEventMethod = localQueueEventMethod;
-        queueEventOfDeathMethod = localEventOfDeathMethod;
-        getClientCnxnSocketMethod = localGetClientCnxnSocketMethod;
-        wakeupCnxnMethod = localWakeupCnxnMethod;
     }
 
     public static void injectSessionExpiration(ZooKeeper zooKeeper)
@@ -88,11 +66,8 @@ public class InjectSessionExpiration
             ClientCnxn clientCnxn = (ClientCnxn)cnxnField.get(zooKeeper);
             Object eventThread = eventThreadField.get(clientCnxn);
             queueEventMethod.invoke(eventThread, event);
-            queueEventOfDeathMethod.invoke(eventThread);
-            // we used to set the state field to CLOSED here but this resulted in CURATOR-498
-            Object sendThread = sendThreadField.get(clientCnxn);
-            Object clientCnxnSocket = getClientCnxnSocketMethod.invoke(sendThread);
-            wakeupCnxnMethod.invoke(clientCnxnSocket);
+
+            // we used to set the state field to CLOSED here and a few other things but this resulted in CURATOR-498
         }
         catch ( ReflectiveOperationException e )
         {
index 2875f49..f951fb5 100644 (file)
@@ -21,8 +21,8 @@ package org.apache.curator;
 import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.compatibility.KillSession2;
 import org.apache.curator.test.Timing;
+import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ZookeeperFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -100,7 +100,7 @@ public class BasicTests extends BaseClassForTests
                                 // ignore
                             }
 
-                            KillSession2.kill(client.getZooKeeper());
+                            Compatibility.injectSessionExpiration(client.getZooKeeper());
 
                             Assert.assertTrue(timing.awaitLatch(latch));
                         }
index 39b0e45..7c9c963 100644 (file)
@@ -20,9 +20,9 @@ package org.apache.curator;
 
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.compatibility.KillSession2;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.test.Timing;
+import org.apache.curator.utils.Compatibility;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.concurrent.Callable;
@@ -57,7 +57,7 @@ public class TestSessionFailRetryLoop extends BaseClassForTests
                                 if ( firstTime.compareAndSet(true, false) )
                                 {
                                     Assert.assertNull(client.getZooKeeper().exists("/foo/bar", false));
-                                    KillSession2.kill(client.getZooKeeper());
+                                    Compatibility.injectSessionExpiration(client.getZooKeeper());
                                     client.getZooKeeper();
                                     client.blockUntilConnectedOrTimedOut();
                                 }
@@ -131,7 +131,7 @@ public class TestSessionFailRetryLoop extends BaseClassForTests
                                     if ( firstTime.compareAndSet(true, false) )
                                     {
                                         Assert.assertNull(client.getZooKeeper().exists("/foo/bar", false));
-                                        KillSession2.kill(client.getZooKeeper());
+                                        Compatibility.injectSessionExpiration(client.getZooKeeper());
                                         client.getZooKeeper();
                                         client.blockUntilConnectedOrTimedOut();
                                     }
@@ -196,7 +196,7 @@ public class TestSessionFailRetryLoop extends BaseClassForTests
                                 public Void call() throws Exception
                                 {
                                     Assert.assertNull(client.getZooKeeper().exists("/foo/bar", false));
-                                    KillSession2.kill(client.getZooKeeper());
+                                    Compatibility.injectSessionExpiration(client.getZooKeeper());
 
                                     timing.sleepABit();
 
@@ -258,7 +258,7 @@ public class TestSessionFailRetryLoop extends BaseClassForTests
                                     public Void call() throws Exception
                                     {
                                         Assert.assertNull(client.getZooKeeper().exists("/foo/bar", false));
-                                        KillSession2.kill(client.getZooKeeper());
+                                        Compatibility.injectSessionExpiration(client.getZooKeeper());
 
                                         client.getZooKeeper();
                                         client.blockUntilConnectedOrTimedOut();
index cd33ab0..5c2f052 100644 (file)
@@ -30,9 +30,9 @@ import java.util.UUID;
 class ProtectedMode
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private boolean doProtected = false;
-    private String protectedId = null;
-    private long sessionId = 0;
+    private volatile boolean doProtected = false;
+    private volatile String protectedId = null;
+    private volatile long sessionId = 0;
 
     /**
      * Enable protected mode
@@ -95,16 +95,17 @@ class ProtectedMode
     {
         if ( doProtected && createMode.isEphemeral() )
         {
-            if ( sessionId != client.getZooKeeper().getSessionId() )
+            long clientSessionId = client.getZooKeeper().getSessionId();
+            if ( this.sessionId != clientSessionId )
             {
-                log.info("Session has changed during protected mode with ephemeral. old: {} new: {}", sessionId, client.getZooKeeper().getSessionId());
+                log.info("Session has changed during protected mode with ephemeral. old: {} new: {}", this.sessionId, clientSessionId);
                 if ( foundNode != null )
                 {
                     log.info("Deleted old session's found node: {}", foundNode);
                     client.getFailedDeleteManager().executeGuaranteedOperationInBackground(foundNode);
                     foundNode = null;
                 }
-                sessionId = client.getZooKeeper().getSessionId();
+                this.sessionId = clientSessionId;
             }
         }
         return foundNode;
index e61ee9f..773d9c9 100644 (file)
@@ -25,9 +25,9 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.compatibility.KillSession2;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -127,7 +127,7 @@ public class TestEnabledSessionExpiredState extends BaseClassForTests
     {
         Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
 
-        KillSession2.kill(client.getZookeeperClient().getZooKeeper());
+        Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
 
         Assert.assertEquals(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
         Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);
index 066cee2..7c6d156 100644 (file)
@@ -40,9 +40,9 @@ import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.TestingServer;
-import org.apache.curator.test.compatibility.KillSession2;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -92,7 +92,7 @@ public class TestFrameworkEdges extends BaseClassForTests
                 }
             };
             client.checkExists().usingWatcher(watcher).forPath("/foobar");
-            KillSession2.kill(client.getZookeeperClient().getZooKeeper());
+            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
             Assert.assertTrue(timing.awaitLatch(expiredLatch));
         }
     }
@@ -523,22 +523,18 @@ public class TestFrameworkEdges extends BaseClassForTests
         {
             client.create().forPath("/sessionTest");
 
-            final AtomicBoolean sessionDied = new AtomicBoolean(false);
-            Watcher watcher = new Watcher()
-            {
-                @Override
-                public void process(WatchedEvent event)
+            CountDownLatch sessionDiedLatch = new CountDownLatch(1);
+            Watcher watcher = event -> {
+                if ( event.getState() == Watcher.Event.KeeperState.Expired )
                 {
-                    if ( event.getState() == Event.KeeperState.Expired )
-                    {
-                        sessionDied.set(true);
-                    }
+                    sessionDiedLatch.countDown();
                 }
             };
+
             client.checkExists().usingWatcher(watcher).forPath("/sessionTest");
-            KillSession2.kill(client.getZookeeperClient().getZooKeeper());
+            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+            Assert.assertTrue(timing.awaitLatch(sessionDiedLatch));
             Assert.assertNotNull(client.checkExists().forPath("/sessionTest"));
-            Assert.assertTrue(sessionDied.get());
         }
         finally
         {
index bb8aa73..446b7cb 100644 (file)
@@ -467,6 +467,12 @@ public class LeaderLatch implements Closeable
     }
 
     @VisibleForTesting
+    String getOurPath()
+    {
+        return ourPath.get();
+    }
+
+    @VisibleForTesting
     volatile CountDownLatch debugResetWaitLatch = null;
 
     @VisibleForTesting
@@ -524,8 +530,16 @@ public class LeaderLatch implements Closeable
         }
     }
 
+    @VisibleForTesting
+    volatile CountDownLatch debugCheckLeaderShipLatch = null;
+
     private void checkLeadership(List<String> children) throws Exception
     {
+        if ( debugCheckLeaderShipLatch != null )
+        {
+            debugCheckLeaderShipLatch.await();
+        }
+
         final String localOurPath = ourPath.get();
         List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
         int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
index b984624..175ccdf 100644 (file)
@@ -26,7 +26,6 @@ import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
-import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -164,8 +163,20 @@ public class BaseTestTreeCache extends BaseClassForTests
      */
     TreeCacheEvent assertEvent(TreeCacheEvent.Type expectedType, String expectedPath, byte[] expectedData) throws InterruptedException
     {
+        return assertEvent(expectedType, expectedPath, expectedData, false);
+    }
+
+    TreeCacheEvent assertEvent(TreeCacheEvent.Type expectedType, String expectedPath, byte[] expectedData, boolean ignoreConnectionEvents) throws InterruptedException
+    {
         TreeCacheEvent event = events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS);
         Assert.assertNotNull(event, String.format("Expected type: %s, path: %s", expectedType, expectedPath));
+        if ( ignoreConnectionEvents )
+        {
+            if ( (event.getType() == TreeCacheEvent.Type.CONNECTION_SUSPENDED) || (event.getType() == TreeCacheEvent.Type.CONNECTION_LOST) || (event.getType() == TreeCacheEvent.Type.CONNECTION_RECONNECTED) )
+            {
+                return assertEvent(expectedType, expectedPath, expectedData, ignoreConnectionEvents);
+            }
+        }
 
         String message = event.toString();
         Assert.assertEquals(event.getType(), expectedType, message);
index 253c777..ff416d5 100644 (file)
@@ -20,7 +20,6 @@ package org.apache.curator.framework.recipes.cache;
 
 import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.compatibility.KillSession2;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
@@ -28,6 +27,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.Timing;
+import org.apache.curator.utils.Compatibility;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.concurrent.Callable;
@@ -196,7 +196,7 @@ public class TestNodeCache extends BaseClassForTests
                 }
             );
 
-            KillSession2.kill(client.getZookeeperClient().getZooKeeper());
+            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
             Thread.sleep(timing.multiple(1.5).session());
 
             Assert.assertEquals(cache.getCurrentData().getData(), "start".getBytes());
index edaac37..78fabd5 100644 (file)
@@ -32,8 +32,8 @@ import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
-import org.apache.curator.test.compatibility.KillSession2;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.testng.Assert;
@@ -190,11 +190,11 @@ public class TestPathChildrenCache extends BaseClassForTests
             cache.getListenable().addListener(listener);
             cache.start();
             Assert.assertTrue(timing.awaitLatch(ensurePathLatch));
-            
+
             final CountDownLatch connectedLatch = new CountDownLatch(1);
             client.getConnectionStateListenable().addListener(new ConnectionStateListener()
             {
-                
+
                 @Override
                 public void stateChanged(CuratorFramework client, ConnectionState newState)
                 {
@@ -206,7 +206,7 @@ public class TestPathChildrenCache extends BaseClassForTests
             });
 
             server = new TestingServer(serverPort, true);
-            
+
             Assert.assertTrue(timing.awaitLatch(connectedLatch));
 
             client.create().creatingParentContainersIfNeeded().forPath("/baz", new byte[]{1, 2, 3});
@@ -814,7 +814,7 @@ public class TestPathChildrenCache extends BaseClassForTests
             client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
             Assert.assertTrue(timing.awaitLatch(childAddedLatch));
 
-            KillSession2.kill(client.getZookeeperClient().getZooKeeper());
+            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
             Assert.assertTrue(timing.awaitLatch(lostLatch));
             Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
             Assert.assertTrue(timing.awaitLatch(removedLatch));
index 9631d12..1e97ce2 100644 (file)
@@ -23,14 +23,12 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.test.compatibility.KillSession2;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.CreateMode;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class TestTreeCache extends BaseTestTreeCache
@@ -425,21 +423,9 @@ public class TestTreeCache extends BaseTestTreeCache
         client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
         assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/me");
 
-        KillSession2.kill(client.getZookeeperClient().getZooKeeper());
-        if ( client.isZk34CompatibilityMode() )
-        {
-            assertEvent(TreeCacheEvent.Type.CONNECTION_LOST);
-            assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/me", "data".getBytes());
-            assertEvent(TreeCacheEvent.Type.CONNECTION_RECONNECTED);
-            assertEvent(TreeCacheEvent.Type.INITIALIZED);
-        }
-        else
-        {
-            assertEvent(TreeCacheEvent.Type.CONNECTION_LOST);
-            assertEvent(TreeCacheEvent.Type.CONNECTION_RECONNECTED);
-            assertEvent(TreeCacheEvent.Type.INITIALIZED);
-            assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/me", "data".getBytes());
-        }
+        Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
+        assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/me", "data".getBytes(), true);
+        assertEvent(TreeCacheEvent.Type.INITIALIZED, null, null, true);
 
         assertNoMoreEvents();
     }
@@ -483,7 +469,6 @@ public class TestTreeCache extends BaseTestTreeCache
         client.create().forPath("/test");
         client.create().forPath("/test/one", "hey there".getBytes());
 
-
         cache = buildWithListeners(TreeCache.newBuilder(client, "/test").disableZkWatches(true));
 
         cache.start();
@@ -620,7 +605,8 @@ public class TestTreeCache extends BaseTestTreeCache
             @Override
             public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception
             {
-                if (event.getType() == Type.NODE_UPDATED) {
+                if ( event.getType() == Type.NODE_UPDATED )
+                {
                     throw new RuntimeException("Test Exception");
                 }
             }
index 011e4a0..e3e0aeb 100644 (file)
@@ -70,6 +70,40 @@ public class TestLeaderLatch extends BaseClassForTests
     }
 
     @Test
+    public void testWatchedNodeDeletedOnReconnect() throws Exception
+    {
+        final String latchPath = "/foo/bar";
+        Timing timing = new Timing();
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+            try (LeaderLatch latch1 = new LeaderLatch(client, latchPath) )
+            {
+                latch1.start();
+                latch1.await();
+
+                try ( LeaderLatch latch2 = new LeaderLatch(client, latchPath) )
+                {
+                    latch2.start(); // will get a watcher on latch1's node
+                    timing.sleepABit();
+
+                    latch2.debugCheckLeaderShipLatch = new CountDownLatch(1);
+                    client.delete().forPath(latch1.getOurPath());   // simulate the leader's path getting deleted
+                    timing.sleepABit(); // after this, latch2 should be blocked just before getting the path in checkLeadership()
+
+                    latch2.reset(); // force the internal "ourPath" to get reset
+                    latch2.debugCheckLeaderShipLatch.countDown();   // allow checkLeadership() to continue
+
+                    Assert.assertTrue(latch2.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+
+                    Assert.assertEquals(client.getChildren().forPath(latchPath).size(), 1);
+                    Assert.assertEquals(latch1.getLeader(), latch2.getLeader());
+                }
+            }
+        }
+    }
+
+    @Test
     public void testSessionErrorPolicy() throws Exception
     {
         Timing timing = new Timing();
index 9a5e42e..b92c3a2 100644 (file)
@@ -24,7 +24,6 @@ import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.framework.state.SessionConnectionStateErrorPolicy;
@@ -33,14 +32,11 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
-import org.apache.curator.test.compatibility.KillSession2;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.utils.ZKPaths;
+import org.apache.curator.utils.Compatibility;
 import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import java.lang.reflect.Array;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
@@ -535,7 +531,7 @@ public class TestLeaderSelector extends BaseClassForTests
 
             Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
 
-            KillSession2.kill(client.getZookeeperClient().getZooKeeper());
+            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
 
             Assert.assertTrue(timing.awaitLatch(interruptedLatch));
             timing.sleepABit();
index 8ab2dc5..e9645e8 100644 (file)
@@ -26,8 +26,8 @@ import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.framework.schema.Schema;
 import org.apache.curator.framework.schema.SchemaSet;
 import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.compatibility.KillSession2;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
 import org.apache.zookeeper.CreateMode;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -172,7 +172,7 @@ public class TestInterProcessMutex extends TestInterProcessMutexBase
             Assert.assertTrue(lock.isAcquiredInThisProcess());
 
             // Kill the session, check that lock node still exists
-            KillSession2.kill(client.getZookeeperClient().getZooKeeper());
+            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
             Assert.assertNotNull(client.checkExists().forPath(LOCK_PATH));
 
             // Release the lock and verify that the actual lock node created no longer exists
index cf44daf..0c62650 100644 (file)
@@ -27,11 +27,11 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.compatibility.KillSession2;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.KeeperException;
 import org.testng.Assert;
@@ -200,7 +200,7 @@ public abstract class TestInterProcessMutexBase extends BaseClassForTests
                 );
 
             Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
-            KillSession2.kill(client.getZookeeperClient().getZooKeeper());
+            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
             Assert.assertTrue(timing.forSessionSleep().acquireSemaphore(semaphore, 1));
         }
         finally
index 03665c5..87585af 100644 (file)
@@ -31,9 +31,9 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.compatibility.KillSession2;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.WatchedEvent;
@@ -329,7 +329,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
 
             node.debugCreateNodeLatch = new CountDownLatch(1);
-            KillSession2.kill(curator.getZookeeperClient().getZooKeeper());
+            Compatibility.injectSessionExpiration(curator.getZookeeperClient().getZooKeeper());
 
             // Make sure the node got deleted
             assertTrue(deletedTrigger.firedWithin(timing.forSessionSleep().seconds(), TimeUnit.SECONDS));
@@ -359,7 +359,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
 
             node.debugCreateNodeLatch = new CountDownLatch(1);
-            KillSession2.kill(curator.getZookeeperClient().getZooKeeper());
+            Compatibility.injectSessionExpiration(curator.getZookeeperClient().getZooKeeper());
 
             // Make sure the node got deleted...
             assertTrue(deletedTrigger.firedWithin(timing.forSessionSleep().seconds(), TimeUnit.SECONDS));
@@ -400,7 +400,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
 
                 node.debugCreateNodeLatch = new CountDownLatch(1);
                 // Kill the session, thus cleaning up the node...
-                KillSession2.kill(curator.getZookeeperClient().getZooKeeper());
+                Compatibility.injectSessionExpiration(curator.getZookeeperClient().getZooKeeper());
 
                 // Make sure the node ended up getting deleted...
                 assertTrue(deletionTrigger.firedWithin(timing.multiple(1.5).forSessionSleep().seconds(), TimeUnit.SECONDS));
@@ -443,7 +443,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             Trigger deletedTrigger = Trigger.deletedOrSetData();
             observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
 
-            KillSession2.kill(nodeCreator.getZookeeperClient().getZooKeeper());
+            Compatibility.injectSessionExpiration(nodeCreator.getZookeeperClient().getZooKeeper());
 
             // Make sure the node got deleted...
             assertTrue(deletedTrigger.firedWithin(timing.forWaiting().seconds(), TimeUnit.SECONDS));
@@ -536,10 +536,10 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             CloseableUtils.closeQuietly(node);
         }
     }
-    
+
     /**
      * Test that if a persistent ephemeral node is created and the node already exists
-     * that if data is present in the PersistentEphermalNode that it is still set. 
+     * that if data is present in the PersistentEphermalNode that it is still set.
      * @throws Exception
      */
     @Test
@@ -547,9 +547,9 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
     {
         CuratorFramework curator = newCurator();
         curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH, "InitialData".getBytes());
-        
+
         byte[] data = "Hello World".getBytes();
-             
+
         PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, data);
         node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
         try
@@ -568,10 +568,10 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
     public void testSetDataWhenDisconnected() throws Exception
     {
         CuratorFramework curator = newCurator();
-        
+
         byte[] initialData = "Hello World".getBytes();
         byte[] updatedData = "Updated".getBytes();
-             
+
         PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, initialData);
         try
         {
@@ -579,11 +579,11 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             node.start();
             node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
             assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), initialData));
-            
+
             server.stop();
-            
+
             final CountDownLatch dataUpdateLatch = new CountDownLatch(1);
-            
+
             Watcher watcher = new Watcher()
             {
                                @Override
@@ -593,22 +593,22 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
                                        {
                                                dataUpdateLatch.countDown();
                                        }
-                               }               
+                               }
             };
-            
+
             curator.getData().usingWatcher(watcher).inBackground().forPath(node.getActualPath());
-            
+
             node.setData(updatedData);
             server.restart();
 
             assertTrue(timing.awaitLatch(dataUpdateLatch));
-                       
+
             assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), updatedData));
         }
         finally
         {
             CloseableUtils.closeQuietly(node);
-        }      
+        }
     }
 
     @Test
@@ -650,7 +650,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             CloseableUtils.closeQuietly(node);
         }
     }
-    
+
     /**
      * See CURATOR-190
      * For protected nodes on reconnect the current protected name was passed to the create builder meaning that it got
@@ -671,12 +671,12 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
             assertNodeExists(curator, node.getActualPath());
 
-            server.restart();            
-            
+            server.restart();
+
             curator.blockUntilConnected(5, TimeUnit.SECONDS);
 
             assertNodeExists(curator, node.getActualPath());
-            
+
             //There should only be a single child, the persisted ephemeral node
             List<String> children = curator.getChildren().forPath(DIR);
             assertFalse(children == null);
@@ -687,7 +687,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
             CloseableUtils.closeQuietly(node);
         }
     }
-    
+
     @Test
     public void testNoCreatePermission() throws Exception
     {
@@ -709,12 +709,12 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
 
                //New client without authentication
                client = newCurator();
-        
+
                node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, PATH,
                                                                    new byte[0]);
             node.debugWaitMsForBackgroundBeforeClose.set(timing.forSleepingABit().milliseconds());
                node.start();
-        
+
             node.waitForInitialCreate(timing.seconds(), TimeUnit.SECONDS);
             assertNodeDoesNotExist(client, PATH);
             assertTrue(node.isAuthFailure());
index bebd7c9..a30b870 100644 (file)
  */
 package org.apache.curator.test;
 
-import org.apache.curator.utils.InjectSessionExpiration;
-import org.apache.zookeeper.ZooKeeper;
-
 public class Compatibility
 {
     public static boolean isZK34()
     {
         return true;
     }
-
-    public static void injectSessionExpiration(ZooKeeper zooKeeper)
-    {
-        InjectSessionExpiration.injectSessionExpiration(zooKeeper);
-    }
 }
index 4fc63df..5b4b53f 100644 (file)
  */
 package org.apache.curator.test;
 
-import org.apache.zookeeper.ZooKeeper;
-
 public class Compatibility
 {
     public static boolean isZK34()
     {
         return false;
     }
-
-    public static void injectSessionExpiration(ZooKeeper zooKeeper)
-    {
-        zooKeeper.getTestable().injectSessionExpiration();
-    }
 }
diff --git a/curator-test/src/main/java/org/apache/curator/test/compatibility/KillSession2.java b/curator-test/src/main/java/org/apache/curator/test/compatibility/KillSession2.java
deleted file mode 100644 (file)
index d747d3d..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.test.compatibility;
-
-import org.apache.curator.test.Compatibility;
-import org.apache.zookeeper.ZooKeeper;
-
-/**
- * <p>
- *     Utility to simulate a ZK session dying.
- * </p>
- */
-public class KillSession2
-{
-    /**
-     * Kill the given ZK session
-     *
-     * @param client the client to kill
-     */
-    public static void     kill(ZooKeeper client)
-    {
-        Compatibility.injectSessionExpiration(client);
-    }
-}
index b67bff9..54719a5 100644 (file)
@@ -25,9 +25,9 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.compatibility.KillSession2;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.Compatibility;
 import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
 import org.apache.curator.x.discovery.ServiceInstance;
@@ -79,7 +79,7 @@ public class TestServiceDiscovery extends BaseClassForTests
             timing.acquireSemaphore(semaphore, 2);
             Assert.assertEquals(discovery.queryForInstances("test").size(), 2);
 
-            KillSession2.kill(client.getZookeeperClient().getZooKeeper());
+            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
             server.stop();
 
             server.restart();
@@ -121,7 +121,7 @@ public class TestServiceDiscovery extends BaseClassForTests
             timing.acquireSemaphore(semaphore);
             Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
 
-            KillSession2.kill(client.getZookeeperClient().getZooKeeper());
+            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
             server.stop();
 
             server.restart();
@@ -154,7 +154,7 @@ public class TestServiceDiscovery extends BaseClassForTests
 
             Assert.assertEquals(discovery.queryForInstances("test").size(), 1);
 
-            KillSession2.kill(client.getZookeeperClient().getZooKeeper());
+            Compatibility.injectSessionExpiration(client.getZookeeperClient().getZooKeeper());
             Thread.sleep(timing.multiple(1.5).session());
 
             Assert.assertEquals(discovery.queryForInstances("test").size(), 1);