CURATOR-460 Adjust to obey sessionExpirationPercent value
authorjavando <antonio.rafael.ar@gmail.com>
Tue, 3 Apr 2018 02:27:13 +0000 (23:27 -0300)
committerjavando <antonio.rafael.ar@gmail.com>
Tue, 3 Apr 2018 02:27:13 +0000 (23:27 -0300)
curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java

index 0c8ddf8..fedcedf 100644 (file)
@@ -253,6 +253,7 @@ public class ConnectionStateManager implements Closeable
             {
                 int lastNegotiatedSessionTimeoutMs = client.getZookeeperClient().getLastNegotiatedSessionTimeoutMs();
                 int useSessionTimeoutMs = (lastNegotiatedSessionTimeoutMs > 0) ? lastNegotiatedSessionTimeoutMs : sessionTimeoutMs;
+                useSessionTimeoutMs = sessionExpirationPercent > 0 && startOfSuspendedEpoch != 0 ? (useSessionTimeoutMs * sessionExpirationPercent) / 100 : useSessionTimeoutMs;
                 long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch;
                 long pollMaxMs = useSessionTimeoutMs - elapsedMs;
 
index 011e4a0..af1475d 100644 (file)
@@ -23,6 +23,7 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.curator.connection.StandardConnectionHandlingPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.imps.TestCleanState;
@@ -222,6 +223,35 @@ public class TestLeaderLatch extends BaseClassForTests
             next.add(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS));
             next.add(states.poll(timing.forSessionSleep().milliseconds(), TimeUnit.MILLISECONDS));
             Assert.assertTrue(next.equals(Arrays.asList(ConnectionState.LOST.name(), "false")) || next.equals(Arrays.asList("false", ConnectionState.LOST.name())), next.toString());
+
+            latch.close();
+            client.close();
+
+            timing.sleepABit();
+            states.clear();
+            server = new TestingServer();
+            client = CuratorFrameworkFactory.builder()
+                    .connectString(server.getConnectString())
+                    .connectionTimeoutMs(1000)
+                    .sessionTimeoutMs(timing.session())
+                    .retryPolicy(new RetryOneTime(1))
+                    .connectionStateErrorPolicy(new SessionConnectionStateErrorPolicy())
+                    .connectionHandlingPolicy(new StandardConnectionHandlingPolicy(30))
+                    .build();
+            client.getConnectionStateListenable().addListener(stateListener);
+            client.start();
+            latch = new LeaderLatch(client, "/test");
+            latch.addListener(listener);
+            latch.start();
+            Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+            Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+            server.close();
+            Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name());
+            next = Lists.newArrayList();
+
+            next.add(states.poll(timing.session() / 3, TimeUnit.MILLISECONDS));
+            next.add(states.poll(timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS));
+            Assert.assertTrue(next.equals(Arrays.asList(ConnectionState.LOST.name(), "false")) || next.equals(Arrays.asList("false", ConnectionState.LOST.name())), next.toString());
         }
         finally
         {