When the connection is lost client.getZookeeperClient().getZooKeeper() needs to be...
authorrandgalt <randgalt@apache.org>
Tue, 20 Feb 2018 18:36:22 +0000 (13:36 -0500)
committerrandgalt <randgalt@apache.org>
Tue, 20 Feb 2018 18:57:50 +0000 (13:57 -0500)
curator-client/src/test/java/org/apache/curator/ensemble/TestEnsembleProvider.java [new file with mode: 0644]
curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
curator-framework/src/test/java/org/apache/curator/framework/ensemble/TestEnsembleProvider.java [new file with mode: 0644]
curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java

diff --git a/curator-client/src/test/java/org/apache/curator/ensemble/TestEnsembleProvider.java b/curator-client/src/test/java/org/apache/curator/ensemble/TestEnsembleProvider.java
new file mode 100644 (file)
index 0000000..36f0fd7
--- /dev/null
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.ensemble;
+
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.Semaphore;
+
+public class TestEnsembleProvider extends BaseClassForTests
+{
+    private final Timing timing = new Timing();
+
+    @Test
+    public void testBasic() throws Exception
+    {
+        Semaphore counter = new Semaphore(0);
+        final CuratorZookeeperClient client = new CuratorZookeeperClient(new CountingEnsembleProvider(counter), timing.session(), timing.connection(), null, new RetryOneTime(2));
+        try
+        {
+            client.start();
+            Assert.assertTrue(timing.acquireSemaphore(counter));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testAfterSessionExpiration() throws Exception
+    {
+        Semaphore counter = new Semaphore(0);
+        final CuratorZookeeperClient client = new CuratorZookeeperClient(new CountingEnsembleProvider(counter), timing.session(), timing.connection(), null, new RetryOneTime(2));
+        try
+        {
+            client.start();
+            Assert.assertTrue(timing.acquireSemaphore(counter));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    private class CountingEnsembleProvider implements EnsembleProvider
+    {
+        private final Semaphore getConnectionStringCounter;
+
+        public CountingEnsembleProvider(Semaphore getConnectionStringCounter)
+        {
+            this.getConnectionStringCounter = getConnectionStringCounter;
+        }
+
+        @Override
+        public void start()
+        {
+            // NOP
+        }
+
+        @Override
+        public String getConnectionString()
+        {
+            getConnectionStringCounter.release();
+            return server.getConnectString();
+        }
+
+        @Override
+        public void close()
+        {
+            // NOP
+        }
+
+        @Override
+        public void setConnectionString(String connectionString)
+        {
+            // NOP
+        }
+
+        @Override
+        public boolean updateServerListEnabled()
+        {
+            return false;
+        }
+    }
+}
index 251baa9..0c8ddf8 100644 (file)
@@ -315,6 +315,18 @@ public class ConnectionStateManager implements Closeable
                 }
             }
         }
+        else if ( currentConnectionState == ConnectionState.LOST )
+        {
+            try
+            {
+                // give ConnectionState.checkTimeouts() a chance to run, reset ensemble providers, etc.
+                client.getZookeeperClient().getZooKeeper();
+            }
+            catch ( Exception e )
+            {
+                log.error("Could not get ZooKeeper", e);
+            }
+        }
     }
 
     private void setCurrentConnectionState(ConnectionState newConnectionState)
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/ensemble/TestEnsembleProvider.java b/curator-framework/src/test/java/org/apache/curator/framework/ensemble/TestEnsembleProvider.java
new file mode 100644 (file)
index 0000000..73d94a5
--- /dev/null
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.ensemble;
+
+import org.apache.curator.ensemble.EnsembleProvider;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+public class TestEnsembleProvider extends BaseClassForTests
+{
+    private final Timing timing = new Timing();
+
+    @Test
+    public void testBasic()
+    {
+        Semaphore counter = new Semaphore(0);
+        final CuratorFramework client = newClient(counter);
+        try
+        {
+            client.start();
+            Assert.assertTrue(timing.acquireSemaphore(counter));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testAfterSessionExpiration() throws Exception
+    {
+        TestingServer oldServer = server;
+        Semaphore counter = new Semaphore(0);
+        final CuratorFramework client = newClient(counter);
+        try
+        {
+            client.start();
+
+            final CountDownLatch connectedLatch = new CountDownLatch(1);
+            final CountDownLatch lostLatch = new CountDownLatch(1);
+            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
+            ConnectionStateListener listener = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    if ( newState == ConnectionState.CONNECTED )
+                    {
+                        connectedLatch.countDown();
+                    }
+                    if ( newState == ConnectionState.LOST )
+                    {
+                        lostLatch.countDown();
+                    }
+                    if ( newState == ConnectionState.RECONNECTED )
+                    {
+                        reconnectedLatch.countDown();
+                    }
+                }
+            };
+            client.getConnectionStateListenable().addListener(listener);
+            Assert.assertTrue(timing.awaitLatch(connectedLatch));
+
+            server.stop();
+
+            Assert.assertTrue(timing.awaitLatch(lostLatch));
+            counter.drainPermits();
+            for ( int i = 0; i < 5; ++i )
+            {
+                // the ensemble provider should still be called periodically when the connection is lost
+                Assert.assertTrue(timing.acquireSemaphore(counter), "Failed when i is: " + i);
+            }
+
+            server = new TestingServer();   // this changes the CountingEnsembleProvider's value for getConnectionString() - connection should notice this and recover
+            Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+            CloseableUtils.closeQuietly(oldServer);
+        }
+    }
+
+    private CuratorFramework newClient(Semaphore counter)
+    {
+        return CuratorFrameworkFactory.builder()
+            .ensembleProvider(new CountingEnsembleProvider(counter))
+            .sessionTimeoutMs(timing.session())
+            .connectionTimeoutMs(timing.connection())
+            .retryPolicy(new RetryOneTime(1))
+            .build();
+    }
+
+    private class CountingEnsembleProvider implements EnsembleProvider
+    {
+        private final Semaphore getConnectionStringCounter;
+
+        public CountingEnsembleProvider(Semaphore getConnectionStringCounter)
+        {
+            this.getConnectionStringCounter = getConnectionStringCounter;
+        }
+
+        @Override
+        public void start()
+        {
+            // NOP
+        }
+
+        @Override
+        public String getConnectionString()
+        {
+            getConnectionStringCounter.release();
+            return server.getConnectString();
+        }
+
+        @Override
+        public void close()
+        {
+            // NOP
+        }
+
+        @Override
+        public void setConnectionString(String connectionString)
+        {
+            // NOP
+        }
+
+        @Override
+        public boolean updateServerListEnabled()
+        {
+            return false;
+        }
+    }
+}
index 52446df..7c4af65 100644 (file)
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class BaseClassForTests
 {
-    protected TestingServer server;
+    protected volatile TestingServer server;
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private static final String INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES;