CURATOR-498
authorrandgalt <randgalt@apache.org>
Mon, 28 Jan 2019 19:23:15 +0000 (14:23 -0500)
committerrandgalt <randgalt@apache.org>
Mon, 28 Jan 2019 19:23:15 +0000 (14:23 -0500)
Kudos to user Shay Shimony for his tireless and excellent work tracking this down. There are two problems addressed here: 1) Protected create mode can potentially find a ZNode that is about to be deleted due to an expired session. CreateBuilderImpl now keeps track of the session ID when the create is initiated. If after a connection loss the session ID has changed, any found protected node is ignored as it will soon be deleted. 2) For ZooKeeper 3.4.x the simulated (via reflection) InjectSessionExpiration was incorrectly setting the connection state to closed which caused the session expiration to be ignored.

curator-client/src/main/java/org/apache/curator/utils/InjectSessionExpiration.java
curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java

index 996e9a2..caf9cbf 100644 (file)
@@ -30,7 +30,6 @@ import java.lang.reflect.Method;
 public class InjectSessionExpiration
 {
     private static final Field cnxnField;
-    private static final Field stateField;
     private static final Field eventThreadField;
     private static final Field sendThreadField;
     private static final Method queueEventMethod;
@@ -40,7 +39,6 @@ public class InjectSessionExpiration
     static
     {
         Field localCnxnField;
-        Field localStateField;
         Field localEventThreadField;
         Field localSendThreadField;
         Method localQueueEventMethod;
@@ -55,8 +53,6 @@ public class InjectSessionExpiration
 
             localCnxnField = ZooKeeper.class.getDeclaredField("cnxn");
             localCnxnField.setAccessible(true);
-            localStateField = ClientCnxn.class.getDeclaredField("state");
-            localStateField.setAccessible(true);
             localEventThreadField = ClientCnxn.class.getDeclaredField("eventThread");
             localEventThreadField.setAccessible(true);
             localSendThreadField = ClientCnxn.class.getDeclaredField("sendThread");
@@ -75,7 +71,6 @@ public class InjectSessionExpiration
             throw new RuntimeException("Could not access internal ZooKeeper fields", e);
         }
         cnxnField = localCnxnField;
-        stateField = localStateField;
         eventThreadField = localEventThreadField;
         sendThreadField = localSendThreadField;
         queueEventMethod = localQueueEventMethod;
@@ -94,7 +89,7 @@ public class InjectSessionExpiration
             Object eventThread = eventThreadField.get(clientCnxn);
             queueEventMethod.invoke(eventThread, event);
             queueEventOfDeathMethod.invoke(eventThread);
-            stateField.set(clientCnxn, ZooKeeper.States.CLOSED);
+            // we used to set the state field to CLOSED here but this resulted in CURATOR-498
             Object sendThread = sendThreadField.get(clientCnxn);
             Object clientCnxnSocket = getClientCnxnSocketMethod.invoke(sendThread);
             wakeupCnxnMethod.invoke(clientCnxnSocket);
index ce82542..d1ff2bb 100644 (file)
@@ -36,10 +36,12 @@ import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -48,19 +50,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<String>
 {
+    private final Logger log = LoggerFactory.getLogger(getClass());
     private final CuratorFrameworkImpl client;
     private CreateMode createMode;
     private Backgrounding backgrounding;
     private boolean createParentsIfNeeded;
     private boolean createParentsAsContainers;
-    private boolean doProtected;
     private boolean compress;
     private boolean setDataIfExists;
     private int setDataIfExistsVersion = -1;
-    private String protectedId;
     private ACLing acling;
     private Stat storingStat;
     private long ttl;
+    private boolean doProtected;
+    private String protectedId;
+    private long initialSessionId;
 
     @VisibleForTesting
     boolean failNextCreateForTesting = false;
@@ -77,11 +81,13 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
         createParentsIfNeeded = false;
         createParentsAsContainers = false;
         compress = false;
-        doProtected = false;
         setDataIfExists = false;
-        protectedId = null;
         storingStat = null;
         ttl = -1;
+        initialSessionId = 0;
+
+        doProtected = false;
+        protectedId = null;
     }
 
     public CreateBuilderImpl(CuratorFrameworkImpl client, CreateMode createMode, Backgrounding backgrounding, boolean createParentsIfNeeded, boolean createParentsAsContainers, boolean doProtected, boolean compress, boolean setDataIfExists, List<ACL> aclList, Stat storingStat, long ttl)
@@ -618,9 +624,9 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
                  */
                 new FindAndDeleteProtectedNodeInBackground(client, ZKPaths.getPathAndNode(adjustedPath).getPath(), protectedId).execute();
                 /*
-                * The current UUID is scheduled to be deleted, it is not safe to use it again.
-                * If this builder is used again later create a new UUID
-                */
+                 * The current UUID is scheduled to be deleted, it is not safe to use it again.
+                 * If this builder is used again later create a new UUID
+                 */
                 protectedId = UUID.randomUUID().toString();
             }
 
@@ -685,16 +691,16 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
             else
             {
                 CreateZK35.create
-                (
-                    client.getZooKeeper(),
-                    operationAndData.getData().getPath(),
-                    data,
-                    acling.getAclList(operationAndData.getData().getPath()),
-                    createMode,
-                    mainCallback,
-                    backgrounding.getContext(),
-                    ttl
-                );
+                    (
+                        client.getZooKeeper(),
+                        operationAndData.getData().getPath(),
+                        data,
+                        acling.getAclList(operationAndData.getData().getPath()),
+                        createMode,
+                        mainCallback,
+                        backgrounding.getContext(),
+                        ttl
+                    );
             }
         }
         catch ( Throwable e )
@@ -748,7 +754,7 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
 
             @Override
             public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback callback, Object context,
-                    Executor executor) {
+                                                                      Executor executor) {
                 return CreateBuilderImpl.this.inBackground(callback, context, executor);
             }
 
@@ -1104,6 +1110,10 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
             {
                 boolean callSuper = true;
                 boolean localFirstTime = firstTime.getAndSet(false) && !debugForceFindProtectedNode;
+                if ( initialSessionId == 0 )
+                {
+                    initialSessionId = client.getZooKeeper().getSessionId();
+                }
                 if ( !localFirstTime && doProtected )
                 {
                     debugForceFindProtectedNode = false;
@@ -1134,8 +1144,8 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
 
                 if ( failNextCreateForTesting )
                 {
-                    pathInForeground(path, data, acling.getAclList(path));   // simulate success on server without notification to client
                     failNextCreateForTesting = false;
+                    pathInForeground(path, data, acling.getAclList(path));   // simulate success on server without notification to client
                     throw new KeeperException.ConnectionLossException();
                 }
 
@@ -1162,6 +1172,10 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
                     public String call() throws Exception
                     {
                         boolean localFirstTime = firstTime.getAndSet(false) && !debugForceFindProtectedNode;
+                        if ( initialSessionId == 0 )
+                        {
+                            initialSessionId = client.getZooKeeper().getSessionId();
+                        }
 
                         String createdPath = null;
                         if ( !localFirstTime && doProtected )
@@ -1253,6 +1267,22 @@ public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, Backgro
                             List<String> children = client.getZooKeeper().getChildren(pathAndNode.getPath(), false);
 
                             foundNode = findNode(children, pathAndNode.getPath(), protectedId);
+                            log.debug("Protected mode findNode result: {}", foundNode);
+
+                            if ( doProtected && createMode.isEphemeral() )
+                            {
+                                if ( initialSessionId != client.getZooKeeper().getSessionId() )
+                                {
+                                    log.info("Session has changed during protected mode with ephemeral. old: {} new: {}", initialSessionId, client.getZooKeeper().getSessionId());
+                                    if ( foundNode != null )
+                                    {
+                                        log.info("Deleted old session's found node: {}", foundNode);
+                                        client.getFailedDeleteManager().executeGuaranteedOperationInBackground(foundNode);
+                                        foundNode = null;
+                                    }
+                                    initialSessionId = client.getZooKeeper().getSessionId();
+                                }
+                            }
                         }
                         catch ( KeeperException.NoNodeException ignore )
                         {
index a28d6c5..066cee2 100644 (file)
@@ -30,11 +30,15 @@ import org.apache.curator.framework.api.CreateBuilder;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.ErrorListenerPathAndBytesable;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryForever;
 import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.compatibility.KillSession2;
 import org.apache.curator.test.compatibility.Timing2;
@@ -50,11 +54,13 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,7 +68,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class TestFrameworkEdges extends BaseClassForTests
 {
-
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final Timing2 timing = new Timing2();
 
@@ -73,6 +78,97 @@ public class TestFrameworkEdges extends BaseClassForTests
     }
 
     @Test
+    public void testInjectSessionExpiration() throws Exception
+    {
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)))
+        {
+            client.start();
+
+            CountDownLatch expiredLatch = new CountDownLatch(1);
+            Watcher watcher = event -> {
+                if ( event.getState() == Watcher.Event.KeeperState.Expired )
+                {
+                    expiredLatch.countDown();
+                }
+            };
+            client.checkExists().usingWatcher(watcher).forPath("/foobar");
+            KillSession2.kill(client.getZookeeperClient().getZooKeeper());
+            Assert.assertTrue(timing.awaitLatch(expiredLatch));
+        }
+    }
+
+    @Test
+    public void testProtectionWithKilledSession() throws Exception
+    {
+        server.stop();  // not needed
+
+        // see CURATOR-498
+        // attempt to re-create the state described in the bug report: create a 3 Instance ensemble;
+        // have Curator connect to only 1 one of those instances; set failNextCreateForTesting to
+        // simulate protection mode searching; kill the connected server when this happens;
+        // wait for session timeout to elapse and then restart the instance. In most cases
+        // this will cause the scenario as Curator will send the session cancel and do protection mode
+        // search around the same time. The protection mode search should return first as it can be resolved
+        // by the Instance Curator is connected to but the session kill needs a quorum vote (it's a
+        // transaction)
+
+        try (TestingCluster cluster = new TestingCluster(3))
+        {
+            cluster.start();
+            InstanceSpec instanceSpec0 = cluster.getServers().get(0).getInstanceSpec();
+
+            CountDownLatch serverStoppedLatch = new CountDownLatch(1);
+            RetryPolicy retryPolicy = new RetryForever(100)
+            {
+                @Override
+                public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
+                {
+                    if ( serverStoppedLatch.getCount() > 0 )
+                    {
+                        try
+                        {
+                            cluster.killServer(instanceSpec0);
+                        }
+                        catch ( Exception e )
+                        {
+                            // ignore
+                        }
+                        serverStoppedLatch.countDown();
+                    }
+                    return super.allowRetry(retryCount, elapsedTimeMs, sleeper);
+                }
+            };
+
+            try (CuratorFramework client = CuratorFrameworkFactory.newClient(instanceSpec0.getConnectString(), timing.session(), timing.connection(), retryPolicy))
+            {
+                BlockingQueue<String> createdNode = new LinkedBlockingQueue<>();
+                BackgroundCallback callback = (__, event) -> {
+                    if ( event.getType() == CuratorEventType.CREATE )
+                    {
+                        createdNode.offer(event.getPath());
+                    }
+                };
+
+                client.start();
+                client.create().forPath("/test");
+
+                ErrorListenerPathAndBytesable<String> builder = client.create().withProtection().withMode(CreateMode.EPHEMERAL).inBackground(callback);
+                ((CreateBuilderImpl)builder).failNextCreateForTesting = true;
+
+                builder.forPath("/test/hey");
+
+                Assert.assertTrue(timing.awaitLatch(serverStoppedLatch));
+                timing.forSessionSleep().sleep();   // wait for session to expire
+                cluster.restartServer(instanceSpec0);
+
+                String path = timing.takeFromQueue(createdNode);
+                List<String> children = client.getChildren().forPath("/test");
+                Assert.assertEquals(Collections.singletonList(ZKPaths.getNodeFromPath(path)), children);
+            }
+        }
+    }
+
+    @Test
     public void testBackgroundLatencyUnSleep() throws Exception
     {
         server.stop();