Another edge case found by Evaristo. When the SUSPEND is set, a background sync is...
authorrandgalt <randgalt@apache.org>
Sun, 12 Jan 2014 21:53:03 +0000 (16:53 -0500)
committerrandgalt <randgalt@apache.org>
Sun, 12 Jan 2014 21:54:17 +0000 (16:54 -0500)
curator-client/src/main/java/org/apache/curator/ConnectionState.java
curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java [new file with mode: 0644]

index e02ee88..4978c3f 100644 (file)
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 class ConnectionState implements Watcher, Closeable
@@ -49,6 +50,7 @@ class ConnectionState implements Watcher, Closeable
     private final AtomicReference<TracerDriver> tracer;
     private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>();
     private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
+    private final AtomicLong instanceIndex = new AtomicLong();
     private volatile long connectionStartMs = 0;
 
     ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
@@ -131,6 +133,11 @@ class ConnectionState implements Watcher, Closeable
         parentWatchers.remove(watcher);
     }
 
+    long getInstanceIndex()
+    {
+        return instanceIndex.get();
+    }
+
     @Override
     public void process(WatchedEvent event)
     {
@@ -204,6 +211,8 @@ class ConnectionState implements Watcher, Closeable
     {
         log.debug("reset");
 
+        instanceIndex.incrementAndGet();
+
         isConnected.set(false);
         connectionStartMs = System.currentTimeMillis();
         zooKeeper.closeAndReset();
index f4e56f9..f0a4ab3 100644 (file)
@@ -279,6 +279,17 @@ public class CuratorZookeeperClient implements Closeable
         return connectionTimeoutMs;
     }
 
+    /**
+     * Every time a new {@link ZooKeeper} instance is allocated, the "instance index"
+     * is incremented.
+     *
+     * @return the current instance index
+     */
+    public long getInstanceIndex()
+    {
+        return state.getInstanceIndex();
+    }
+
     void        addParentWatcher(Watcher watcher)
     {
         state.addParentWatcher(watcher);
index 1b0ef3f..f1258ea 100644 (file)
@@ -606,14 +606,27 @@ public class CuratorFrameworkImpl implements CuratorFramework
     {
         connectionStateManager.setToSuspended();
 
+        final long instanceIndex = client.getInstanceIndex();
+
         // we appear to have disconnected, force a new ZK event and see if we can connect to another server
-        BackgroundOperation<String> operation = new BackgroundSyncImpl(this, null);
+        final BackgroundOperation<String> operation = new BackgroundSyncImpl(this, null);
         OperationAndData.ErrorCallback<String> errorCallback = new OperationAndData.ErrorCallback<String>()
         {
             @Override
             public void retriesExhausted(OperationAndData<String> operationAndData)
             {
-                connectionStateManager.addStateChange(ConnectionState.LOST);
+                // if instanceIndex != newInstanceIndex, the ZooKeeper instance was reset/reallocated
+                // so the pending background sync is no longer valid
+                long newInstanceIndex = client.getInstanceIndex();
+                if ( instanceIndex == newInstanceIndex )
+                {
+                    connectionStateManager.addStateChange(ConnectionState.LOST);
+                }
+                else
+                {
+                    log.debug("suspendConnection() failure ignored as the ZooKeeper instance was reset. Retrying.");
+                    performBackgroundOperation(new OperationAndData<String>(operation, "/", null, this, null));
+                }
             }
         };
         performBackgroundOperation(new OperationAndData<String>(operation, "/", null, errorCallback, null));
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
new file mode 100644 (file)
index 0000000..e634a6d
--- /dev/null
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.client;
+
+import com.google.common.io.Closeables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.BaseClassForTests;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Test
+    public void testConnectionStateListener() throws Exception
+    {
+        server.close();
+
+        final StringBuilder listenerSequence = new StringBuilder();
+        LeaderSelector selector = null;
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
+        try
+        {
+            client.start();
+            timing.sleepABit();
+
+            LeaderSelectorListener listenerLeader = new LeaderSelectorListenerAdapter()
+            {
+                @Override
+                public void takeLeadership(CuratorFramework client) throws Exception
+                {
+                    Thread.currentThread().join();
+                }
+            };
+            selector = new LeaderSelector(client, "/leader", listenerLeader);
+            selector.autoRequeue();
+            selector.start();
+
+            ConnectionStateListener listener1 = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    listenerSequence.append("-").append(newState);
+                }
+            };
+
+            client.getConnectionStateListenable().addListener(listener1);
+            log.debug("Starting ZK server");
+            server = new TestingServer(server.getPort());
+            timing.forWaiting().sleepABit();
+
+            log.debug("Stopping ZK server");
+            server.close();
+            timing.forWaiting().sleepABit();
+
+            log.debug("Starting ZK server");
+            server = new TestingServer(server.getPort());
+            timing.forWaiting().sleepABit();
+
+            log.debug("Stopping ZK server");
+            server.close();
+            timing.forWaiting().sleepABit();
+
+            Assert.assertEquals(listenerSequence.toString(), "-CONNECTED-SUSPENDED-LOST-RECONNECTED-SUSPENDED-LOST");
+        }
+        finally
+        {
+            Closeables.closeQuietly(selector);
+            Closeables.closeQuietly(client);
+        }
+    }
+
+}
\ No newline at end of file