CURATOR-505 - interim checking - refactoring, simplifications, more testing, and...
authorrandgalt <randgalt@apache.org>
Thu, 7 Feb 2019 17:10:52 +0000 (12:10 -0500)
committerrandgalt <randgalt@apache.org>
Thu, 7 Feb 2019 17:10:52 +0000 (12:10 -0500)
curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java
curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
src/site/confluence/errors.confluence
src/site/confluence/utilities.confluence

index 6513716..2657781 100644 (file)
@@ -30,6 +30,7 @@ import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.schema.SchemaSet;
 import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.framework.state.ConnectionStateListenerDecorator;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
@@ -367,4 +368,13 @@ public interface CuratorFramework extends Closeable
      * @return decorated listener
      */
     ConnectionStateListener decorateConnectionStateListener(ConnectionStateListener actual);
+
+    /**
+     * Returns a facade of the current instance that uses the given connection state listener
+     * decorator instead of the configured one
+     *
+     * @param newDecorator decorator to use
+     * @return facade
+     */
+    CuratorFramework usingConnectionStateListenerDecorator(ConnectionStateListenerDecorator newDecorator);
 }
index 0cc6e0d..283a093 100644 (file)
@@ -508,7 +508,7 @@ public class CuratorFrameworkFactory
          * @return this
          * @since 4.2.0
          */
-        public Builder connectionStateListenerFactory(ConnectionStateListenerDecorator connectionStateListenerDecorator)
+        public Builder connectionStateListenerDecorator(ConnectionStateListenerDecorator connectionStateListenerDecorator)
         {
             this.connectionStateListenerDecorator = Objects.requireNonNull(connectionStateListenerDecorator, "connectionStateListenerFactory cannot be null");
             return this;
index 81cae74..f210021 100644 (file)
@@ -236,6 +236,11 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
     protected CuratorFrameworkImpl(CuratorFrameworkImpl parent)
     {
+        this(parent, parent.connectionStateListenerDecorator);
+    }
+
+    private CuratorFrameworkImpl(CuratorFrameworkImpl parent, ConnectionStateListenerDecorator connectionStateListenerDecorator)
+    {
         client = parent.client;
         listeners = parent.listeners;
         unhandledErrorListeners = parent.unhandledErrorListeners;
@@ -260,7 +265,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         zk34CompatibilityMode = parent.zk34CompatibilityMode;
         ensembleTracker = null;
         runSafeService = parent.runSafeService;
-        connectionStateListenerDecorator = parent.connectionStateListenerDecorator;
+        this.connectionStateListenerDecorator = connectionStateListenerDecorator;
     }
 
     @Override
@@ -599,6 +604,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return connectionStateListenerDecorator.decorateListener(this, actual);
     }
 
+    @Override
+    public CuratorFramework usingConnectionStateListenerDecorator(ConnectionStateListenerDecorator newDecorator)
+    {
+        return new CuratorFrameworkImpl(this, newDecorator);
+    }
+
     ACLProvider getAclProvider()
     {
         return aclProvider;
index ad48a15..504edbc 100644 (file)
@@ -14,7 +14,7 @@ class CircuitBreaker
 
     private boolean isOpen = false;
     private int retryCount = 0;
-    private long openStartNanos = 0;
+    private long startNanos = 0;
 
     CircuitBreaker(RetryPolicy retryPolicy, ScheduledExecutorService service)
     {
@@ -41,13 +41,13 @@ class CircuitBreaker
 
         isOpen = true;
         retryCount = 0;
-        openStartNanos = System.nanoTime();
-        if ( !tryToRetry(completion) )
+        startNanos = System.nanoTime();
+        if ( tryToRetry(completion) )
         {
-            close();
-            return false;
+            return true;
         }
-        return true;
+        close();
+        return false;
     }
 
     boolean tryToRetry(Runnable completion)
@@ -59,13 +59,13 @@ class CircuitBreaker
 
         long[] sleepTimeNanos = new long[]{0L};
         RetrySleeper retrySleeper = (time, unit) -> sleepTimeNanos[0] = unit.toNanos(time);
-        if ( !retryPolicy.allowRetry(retryCount, System.nanoTime() - openStartNanos, retrySleeper) )
+        if ( retryPolicy.allowRetry(retryCount, System.nanoTime() - startNanos, retrySleeper) )
         {
-            return false;
+            ++retryCount;
+            service.schedule(completion, sleepTimeNanos[0], TimeUnit.NANOSECONDS);
+            return true;
         }
-        ++retryCount;
-        service.schedule(completion, sleepTimeNanos[0], TimeUnit.NANOSECONDS);
-        return true;
+        return false;
     }
 
     boolean close()
@@ -73,7 +73,7 @@ class CircuitBreaker
         boolean wasOpen = isOpen;
         retryCount = 0;
         isOpen = false;
-        openStartNanos = 0;
+        startNanos = 0;
         return wasOpen;
     }
 }
index 12efad9..dba651a 100644 (file)
@@ -13,7 +13,7 @@ import java.util.concurrent.ScheduledExecutorService;
  *     A decorator/proxy for connection state listeners that adds circuit breaking behavior. During network
  *     outages ZooKeeper can become very noisy sending connection/disconnection events in rapid succession.
  *     Curator recipes respond to these messages by resetting state, etc. E.g. LeaderLatch must delete
- *     its lock node and try to recreated it in order to try to re-obtain leadership, etc.
+ *     its lock node and try to recreate it in order to try to re-obtain leadership, etc.
  * </p>
  *
  * <p>
@@ -114,7 +114,7 @@ public class CircuitBreakingConnectionStateListener implements ConnectionStateLi
                 log.debug("Could not open circuit breaker. State: {}", newState);
             }
         }
-        callListener(circuitInitialState);
+        callListener(newState);
     }
 
     private synchronized void handleOpenStateChange(ConnectionState newState)
@@ -126,11 +126,10 @@ public class CircuitBreakingConnectionStateListener implements ConnectionStateLi
         }
         else
         {
-            circuitLostHasBeenSent = true;
-            circuitInitialState = ConnectionState.LOST;
-            circuitLastState = newState;
             log.debug("Circuit is open. State changed to LOST. Sending to listener.");
-            callListener(circuitInitialState);
+            circuitLostHasBeenSent = true;
+            circuitLastState = circuitInitialState = ConnectionState.LOST;
+            callListener(ConnectionState.LOST);
         }
     }
 
index 0f11c46..0ac808b 100644 (file)
@@ -17,14 +17,14 @@ import java.util.concurrent.ScheduledExecutorService;
  * <code><pre>
  * CuratorFramework client ...
  * ConnectionStateListener listener = ...
- * ConnectionStateListener wrappedListener = client.wrapConnectionStateListener(listener);
+ * ConnectionStateListener decoratedListener = client.decorateConnectionStateListener(listener);
  *
  * ...
  *
- * client.getConnectionStateListenable().addListener(wrappedListener);
+ * client.getConnectionStateListenable().addListener(decoratedListener);
  *
  * // later, to remove...
- * client.getConnectionStateListenable().removeListener(wrappedListener);
+ * client.getConnectionStateListenable().removeListener(decoratedListener);
  * </pre></code>
  * </p>
  */
index 77ec20f..e2daa96 100644 (file)
@@ -1,7 +1,9 @@
 package org.apache.curator.framework.state;
 
+import org.apache.curator.retry.RetryForever;
 import org.apache.curator.retry.RetryNTimes;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 import java.time.Duration;
 import java.time.temporal.ChronoUnit;
@@ -12,23 +14,30 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class TestCircuitBreaker
 {
+    private Duration[] lastDelay = new Duration[]{Duration.ZERO};
+    private ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(1)
+    {
+        @Override
+        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+        {
+            lastDelay[0] = Duration.of(unit.toNanos(delay), ChronoUnit.NANOS);
+            command.run();
+            return null;
+        }
+    };
+
+    @AfterClass
+    public void tearDown()
+    {
+        service.shutdownNow();
+    }
+
     @Test
     public void testBasic()
     {
         final int retryQty = 1;
         final Duration delay = Duration.ofSeconds(10);
 
-        Duration[] lastDelay = new Duration[]{Duration.ZERO};
-        ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(1)
-        {
-            @Override
-            public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
-            {
-                lastDelay[0] = Duration.of(unit.toNanos(delay), ChronoUnit.NANOS);
-                command.run();
-                return null;
-            }
-        };
         CircuitBreaker circuitBreaker = new CircuitBreaker(new RetryNTimes(retryQty, (int)delay.toMillis()), service);
         AtomicInteger counter = new AtomicInteger(0);
 
@@ -46,4 +55,15 @@ public class TestCircuitBreaker
         Assert.assertEquals(circuitBreaker.getRetryCount(), 0);
         Assert.assertFalse(circuitBreaker.close());
     }
+
+    @Test
+    public void testVariousOpenRetryFails()
+    {
+        CircuitBreaker circuitBreaker = new CircuitBreaker(new RetryForever(1), service);
+        Assert.assertFalse(circuitBreaker.tryToRetry(() -> {}));
+        Assert.assertTrue(circuitBreaker.tryToOpen(() -> {}));
+        Assert.assertFalse(circuitBreaker.tryToOpen(() -> {}));
+        Assert.assertTrue(circuitBreaker.close());
+        Assert.assertFalse(circuitBreaker.close());
+    }
 }
index 36a1954..1712eed 100644 (file)
@@ -8,7 +8,8 @@ import org.apache.curator.retry.RetryForever;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.Timing2;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -20,7 +21,7 @@ public class TestCircuitBreakingConnectionStateListener
     private final CuratorFramework dummyClient = CuratorFrameworkFactory.newClient("foo", new RetryOneTime(1));
     private final Timing2 timing = new Timing2();
     private final Timing2 retryTiming = timing.multiple(.25);
-    private final ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(1);
+    private volatile ScheduledThreadPoolExecutor service;
 
     private static class RecordingListener implements ConnectionStateListener
     {
@@ -49,7 +50,13 @@ public class TestCircuitBreakingConnectionStateListener
         }
     }
 
-    @AfterClass
+    @BeforeMethod
+    public void setup()
+    {
+        service = new ScheduledThreadPoolExecutor(1);
+    }
+
+    @AfterMethod
     public void tearDown()
     {
         service.shutdownNow();
@@ -60,7 +67,7 @@ public class TestCircuitBreakingConnectionStateListener
     {
         RecordingListener recordingListener = new RecordingListener();
         TestRetryPolicy retryPolicy = new TestRetryPolicy();
-        final CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryPolicy, service);
+        CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryPolicy, service);
 
         listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
         Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.RECONNECTED);
@@ -94,8 +101,11 @@ public class TestCircuitBreakingConnectionStateListener
         TestRetryPolicy retryPolicy = new TestRetryPolicy();
         CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryPolicy, service);
 
-        listener.stateChanged(dummyClient, ConnectionState.LOST);
-        listener.stateChanged(dummyClient, ConnectionState.LOST);   // second LOST ignored
+        synchronized(listener)  // don't let retry policy run while we're pushing state changes
+        {
+            listener.stateChanged(dummyClient, ConnectionState.LOST);
+            listener.stateChanged(dummyClient, ConnectionState.LOST);   // second LOST ignored
+        }
         Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST);
         Assert.assertTrue(recordingListener.stateChanges.isEmpty());
 
@@ -112,6 +122,7 @@ public class TestCircuitBreakingConnectionStateListener
 
         listener.stateChanged(dummyClient, ConnectionState.LOST);
         Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST);
+        Assert.assertFalse(listener.isOpen());
         listener.stateChanged(dummyClient, ConnectionState.LOST);
         Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST);
         Assert.assertFalse(listener.isOpen());
@@ -122,7 +133,7 @@ public class TestCircuitBreakingConnectionStateListener
     {
         RecordingListener recordingListener = new RecordingListener();
         RetryPolicy retryOnce = new RetryOneTime(retryTiming.milliseconds());
-        final CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryOnce, service);
+        CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryOnce, service);
 
         synchronized(listener)  // don't let retry policy run while we're pushing state changes
         {
@@ -134,4 +145,65 @@ public class TestCircuitBreakingConnectionStateListener
         Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.SUSPENDED);
         Assert.assertFalse(listener.isOpen());
     }
+
+    @Test
+    public void testSuspendedToLostRatcheting() throws Exception
+    {
+        RecordingListener recordingListener = new RecordingListener();
+        RetryPolicy retryInfinite = new RetryForever(Integer.MAX_VALUE);
+        CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryInfinite, service);
+
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        Assert.assertFalse(listener.isOpen());
+        Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.RECONNECTED);
+
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        Assert.assertTrue(listener.isOpen());
+        Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.SUSPENDED);
+
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        Assert.assertTrue(recordingListener.stateChanges.isEmpty());
+        Assert.assertTrue(listener.isOpen());
+
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST);
+        Assert.assertTrue(listener.isOpen());
+
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        Assert.assertTrue(recordingListener.stateChanges.isEmpty());
+        Assert.assertTrue(listener.isOpen());
+
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        Assert.assertTrue(recordingListener.stateChanges.isEmpty());
+        Assert.assertTrue(listener.isOpen());
+    }
 }
index 439f6c8..bf2abd4 100644 (file)
@@ -64,18 +64,18 @@ public class TestLeaderLatch extends BaseClassForTests
     public void testWithCircuitBreaker() throws Exception
     {
         Timing2 timing = new Timing2();
-        ConnectionStateListenerDecorator factory = ConnectionStateListenerDecorator.circuitBreaking(new RetryForever(timing.milliseconds()));
+        ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(new RetryForever(timing.multiple(2).milliseconds()));
         try ( CuratorFramework client = CuratorFrameworkFactory.builder()
             .connectString(server.getConnectString())
             .retryPolicy(new RetryOneTime(1))
-            .connectionStateListenerFactory(factory)
+            .connectionStateListenerDecorator(decorator)
             .connectionTimeoutMs(timing.connection())
             .sessionTimeoutMs(timing.session())
             .build() )
         {
             client.start();
             AtomicInteger resetCount = new AtomicInteger(0);
-            LeaderLatch latch = new LeaderLatch(client, "/foo/bar")
+            try ( LeaderLatch latch = new LeaderLatch(client, "/foo/bar")
             {
                 @Override
                 void reset() throws Exception
@@ -83,18 +83,20 @@ public class TestLeaderLatch extends BaseClassForTests
                     resetCount.incrementAndGet();
                     super.reset();
                 }
-            };
-            latch.start();
-            Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
-
-            for ( int i = 0; i < 5; ++i )
+            } )
             {
-                server.stop();
-                server.restart();
-                timing.sleepABit();
+                latch.start();
+                Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+
+                for ( int i = 0; i < 5; ++i )
+                {
+                    server.stop();
+                    server.restart();
+                    timing.sleepABit();
+                }
+                Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+                Assert.assertEquals(resetCount.get(), 2);
             }
-            Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
-            Assert.assertEquals(resetCount.get(), 2);
         }
     }
 
index b4f6643..97f23fd 100644 (file)
@@ -19,7 +19,8 @@ in a retry mechanism. Thus, the following guarantees can be made:
 h2. Notifications
 Curator exposes several listenable interfaces for clients to monitor the state of the ZooKeeper connection.
 
-{{ConnectionStateListener}} is called when there are connection disruptions. Clients can monitor these changes and take
+{{ConnectionStateListener}} (note see [[Utilities|utilities.html]] for details on properly decorating listeners) is called when there are connection
+disruptions. Clients can monitor these changes and take
 appropriate action. These are the possible state changes:
 
 |CONNECTED|Sent for the first successful connection to the server. NOTE: You will only get one of these messages for any CuratorFramework instance.|
index 3a62fa5..1971c3c 100644 (file)
@@ -14,6 +14,50 @@ Various static methods to help with using ZooKeeper ZNode paths:
 * getSortedChildren: Return the children of the given path sorted by sequence number
 * makePath: Given a parent path and a child node, create a combined full path
 
+h2. Circuit Breaking ConnectionStateListener
+
+During network outages ZooKeeper can become very noisy sending connection/disconnection events in rapid succession. Curator recipes respond to these
+messages by resetting state, etc. E.g. LeaderLatch must delete its lock node and try to recreate it in order to try to re\-obtain leadership, etc.
+
+This noisy herding can be avoided by using the circuit breaking listener decorator. When it receives ConnectionState.SUSPENDED, the circuit becomes "open"
+(based on the provided RetryPolicy) and will ignore future connection state changes until RetryPolicy timeout has elapsed. Note: however, if the connection
+goes from ConnectionState.SUSPENDED to ConnectionState.LOST the first LOST state is sent.
+
+When the circuit decorator is closed, all connection state changes are forwarded to the managed listener. When the first disconnected state is received, the
+circuit becomes open. The state change that caused the circuit to open is sent to the managed listener and the RetryPolicy will be used to get a delay amount.
+While the delay is active, the decorator will store state changes but will not forward them to the managed listener (except, however, the first time the state
+changes from SUSPENDED to LOST). When the delay elapses, if the connection has been restored, the circuit closes and forwards the new state to the managed listener.
+If the connection has not been restored, the RetryPolicy is checked again. If the RetryPolicy indicates another retry is allowed the process repeats. If, however,
+the RetryPolicy indicates that retries are exhausted then the circuit closes \- if the current state is different than the state that caused the circuit to open it is
+forwarded to the managed listener.
+
+You can enable the Circuit Breaking ConnectionStateListener during creation of your CuratorFramework instance. All Curator recipes will decorate
+their ConnectionStateListeners using the configured decorator. E.g.
+
+{code}
+ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(...);
+CuratorFramework client = CuratorFrameworkFactory.builder()
+    ...
+    .connectionStateListenerDecorator(decorator)
+    ...
+    .build();
+{code}
+
+If you are setting a ConnectionStateListener you should always "decorate" it by calling {{decorateConnectionStateListener()}}.
+
+{code}
+CuratorFramework client ...
+ConnectionStateListener listener = ...
+ConnectionStateListener decoratedListener = client.decorateConnectionStateListener(listener);
+
+...
+
+client.getConnectionStateListenable().addListener(decoratedListener);
+
+// later, to remove...
+client.getConnectionStateListenable().removeListener(decoratedListener);
+{code}
+
 h2. Locker
 
 Curator's Locker uses Java 7's try\-with\-resources feature to making using Curator locks safer: