wip
authorrandgalt <randgalt@apache.org>
Sat, 22 Aug 2015 15:47:01 +0000 (10:47 -0500)
committerrandgalt <randgalt@apache.org>
Sat, 22 Aug 2015 15:47:01 +0000 (10:47 -0500)
curator-client/src/main/java/org/apache/curator/ConnectionState.java
curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
curator-recipes/src/test/java/org/apache/curator/framework/client/TestResetConnectionWithBackgroundFailure.java
curator-test/src/main/java/org/apache/curator/test/Timing.java

index 1dfdbef..c3d6921 100644 (file)
@@ -52,6 +52,7 @@ class ConnectionState implements Watcher, Closeable
     private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
     private final AtomicLong instanceIndex = new AtomicLong();
     private volatile long connectionStartMs = 0;
+    private final AtomicBoolean enableTimeoutChecks = new AtomicBoolean(true);
 
     ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
     {
@@ -67,6 +68,11 @@ class ConnectionState implements Watcher, Closeable
         zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
     }
 
+    void disableTimeoutChecks()
+    {
+        enableTimeoutChecks.set(false);
+    }
+
     ZooKeeper getZooKeeper() throws Exception
     {
         if ( SessionFailRetryLoop.sessionForThreadHasFailed() )
@@ -81,10 +87,13 @@ class ConnectionState implements Watcher, Closeable
             throw exception;
         }
 
-        boolean localIsConnected = isConnected.get();
-        if ( !localIsConnected )
+        if ( enableTimeoutChecks.get() )
         {
-            checkTimeouts();
+            boolean localIsConnected = isConnected.get();
+            if ( !localIsConnected )
+            {
+                checkTimeouts();
+            }
         }
 
         return zooKeeper.getZooKeeper();
index fbb2f4c..ce6e9d3 100644 (file)
@@ -50,6 +50,7 @@ public class CuratorZookeeperClient implements Closeable
     private final int connectionTimeoutMs;
     private final AtomicBoolean started = new AtomicBoolean(false);
     private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver());
+    private final boolean manageTimeouts;
 
     /**
      *
@@ -61,7 +62,7 @@ public class CuratorZookeeperClient implements Closeable
      */
     public CuratorZookeeperClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
     {
-        this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false);
+        this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, true);
     }
 
     /**
@@ -73,7 +74,7 @@ public class CuratorZookeeperClient implements Closeable
      */
     public CuratorZookeeperClient(EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
     {
-        this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false);
+        this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, true);
     }
 
     /**
@@ -90,6 +91,25 @@ public class CuratorZookeeperClient implements Closeable
      */
     public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly)
     {
+        this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, canBeReadOnly, true);
+    }
+
+    /**
+     * @param zookeeperFactory factory for creating {@link ZooKeeper} instances
+     * @param ensembleProvider the ensemble provider
+     * @param sessionTimeoutMs session timeout
+     * @param connectionTimeoutMs connection timeout
+     * @param watcher default watcher or null
+     * @param retryPolicy the retry policy to use
+     * @param canBeReadOnly if true, allow ZooKeeper client to enter
+     *                      read only mode in case of a network partition. See
+     *                      {@link ZooKeeper#ZooKeeper(String, int, Watcher, long, byte[], boolean)}
+     *                      for details
+     * @param manageTimeouts in general, Curator clients try to manage session/connection timeouts. If this is false, that management is turned off
+     */
+    public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly, boolean manageTimeouts)
+    {
+        this.manageTimeouts = manageTimeouts;
         if ( sessionTimeoutMs < connectionTimeoutMs )
         {
             log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
@@ -100,6 +120,10 @@ public class CuratorZookeeperClient implements Closeable
 
         this.connectionTimeoutMs = connectionTimeoutMs;
         state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly);
+        if ( !manageTimeouts )
+        {
+            state.disableTimeoutChecks();
+        }
         setRetryPolicy(retryPolicy);
     }
 
@@ -302,9 +326,15 @@ public class CuratorZookeeperClient implements Closeable
         return state.getInstanceIndex();
     }
 
+    /**
+     * Returns true if connection timeouts should cause the retry policy to be checked. If false
+     * is returned, throw a connection exception without retrying
+     *
+     * @return true/false
+     */
     public boolean retryConnectionTimeouts()
     {
-        return true;
+        return manageTimeouts;
     }
 
     void addParentWatcher(Watcher watcher)
index c359fdc..bcbeecd 100644 (file)
@@ -122,15 +122,9 @@ public class CuratorFrameworkImpl implements CuratorFramework
                 }
             },
             builder.getRetryPolicy(),
-            builder.canBeReadOnly()
-        )
-        {
-            @Override
-            public boolean retryConnectionTimeouts()
-            {
-                return !enableSessionExpiredState;
-            }
-        };
+            builder.canBeReadOnly(),
+            !builder.getEnableSessionExpiredState() // inverse is correct here. By default, CuratorZookeeperClient manages timeouts. The new SessionExpiredState needs this disabled.
+        );
 
         listeners = new ListenerContainer<CuratorListener>();
         unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
index 553faac..52e0d07 100644 (file)
@@ -295,7 +295,7 @@ public class ConnectionStateManager implements Closeable
             long elapsedMs = System.currentTimeMillis() - startOfSuspendedEpoch;
             if ( elapsedMs >= sessionTimeoutMs )
             {
-                log.info(String.format("Session timeout has elapsed while SUSPENDED. Posting LOST event and resetting the connection. Elapsed ms: %d", elapsedMs));
+                log.warn(String.format("Session timeout has elapsed while SUSPENDED. Posting LOST event and resetting the connection. Elapsed ms: %d", elapsedMs));
                 try
                 {
                     client.getZookeeperClient().reset();
index 150eb50..cd415b1 100644 (file)
@@ -123,7 +123,7 @@ public class TestEnabledSessionExpiredState extends BaseClassForTests
     {
         Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
         server.stop();
-        Thread.sleep(timing.multiple(1.2).session());
+        timing.sleepForSession();
         Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
         Assert.assertEquals(states.poll(timing.multiple(2).session(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
         server.restart();
index 7d2cb89..b90311b 100644 (file)
@@ -19,6 +19,7 @@
 
 package org.apache.curator.framework.client;
 
+import com.google.common.collect.Queues;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.leader.LeaderSelector;
@@ -36,6 +37,8 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests
 {
@@ -53,7 +56,6 @@ public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests
     {
         server.stop();
 
-        final StringBuilder listenerSequence = new StringBuilder();
         LeaderSelector selector = null;
         Timing timing = new Timing();
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
@@ -74,12 +76,13 @@ public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests
             selector.autoRequeue();
             selector.start();
 
+            final BlockingQueue<ConnectionState> states = Queues.newLinkedBlockingQueue();
             ConnectionStateListener listener1 = new ConnectionStateListener()
             {
                 @Override
                 public void stateChanged(CuratorFramework client, ConnectionState newState)
                 {
-                    listenerSequence.append("-").append(newState);
+                    states.add(newState);
                 }
             };
 
@@ -90,17 +93,21 @@ public class TestResetConnectionWithBackgroundFailure extends BaseClassForTests
 
             log.debug("Stopping ZK server");
             server.stop();
-            timing.forWaiting().sleepABit();
+            timing.sleepForSession();
 
             log.debug("Starting ZK server");
             server.restart();
-            timing.forWaiting().sleepABit();
+
+            Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
+            Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
+            Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
+            Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);
 
             log.debug("Stopping ZK server");
             server.close();
-            timing.forWaiting().sleepABit();
 
-            Assert.assertEquals(listenerSequence.toString(), "-CONNECTED-SUSPENDED-LOST-RECONNECTED-SUSPENDED-LOST");
+            Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
+            Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
         }
         finally
         {
index 753d62d..fc4b314 100644 (file)
@@ -35,6 +35,7 @@ public class Timing
     private static final int DEFAULT_SECONDS = 10;
     private static final int DEFAULT_WAITING_MULTIPLE = 5;
     private static final double SESSION_MULTIPLE = 1.5;
+    private static final double SESSION_SLEEP_MULTIPLE = 1.75;  // has to be at least session + 2/3 of a session to account for missed heartbeat then session expiration
 
     /**
      * Use the default base time
@@ -200,6 +201,26 @@ public class Timing
     }
 
     /**
+     * Sleep enough so that the session should expire
+     *
+     * @throws InterruptedException if interrupted
+     */
+    public void sleepForSession() throws InterruptedException
+    {
+        TimeUnit.MILLISECONDS.sleep(sessionSleep());
+    }
+
+    /**
+     * Return the value to sleep to ensure a ZK session timeout
+     *
+     * @return session sleep timeout
+     */
+    public int sessionSleep()
+    {
+        return multiple(SESSION_SLEEP_MULTIPLE).session();
+    }
+
+    /**
      * Return the value to use for ZK session timeout
      *
      * @return session timeout