CURATOR-505
authorrandgalt <randgalt@apache.org>
Thu, 7 Feb 2019 14:02:03 +0000 (09:02 -0500)
committerrandgalt <randgalt@apache.org>
Thu, 7 Feb 2019 14:02:03 +0000 (09:02 -0500)
A decorator/proxy for connection state listeners that adds circuit breaking behavior. During network outages ZooKeeper can become very noisy sending connection/disconnection events in rapid succession. Curator recipes respond to these messages by resetting state, etc. E.g. LeaderLatch must delete its lock node and try to recreated it in order to try to re-obtain leadership, etc.

This noisy herding can be avoided by using the circuit breaking listener decorator. When it receives ConnectionState.SUSPENDED, the circuit becomes "open" (based on the provided RetryPolicy) and will ignore future connection state changes until RetryPolicy timeout has elapsed. Note: however, if the connection goes from ConnectionState.SUSPENDED to ConnectionState.LOST the first LOST state is sent.

19 files changed:
curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerContainer.java
curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java [new file with mode: 0644]
curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java [new file with mode: 0644]
curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java [new file with mode: 0644]
curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java [new file with mode: 0644]
curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java [new file with mode: 0644]
curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java

index 3737faa..6513716 100644 (file)
@@ -357,4 +357,14 @@ public interface CuratorFramework extends Closeable
      * @since 4.1.0
      */
     CompletableFuture<Void> runSafe(Runnable runnable);
+
+    /**
+     * Uses the configured {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator}
+     * to decorate the given listener. You should always decorate connection state listeners via
+     * this method. See the Curator recipes for examples.
+     *
+     * @param actual listener to decorate
+     * @return decorated listener
+     */
+    ConnectionStateListener decorateConnectionStateListener(ConnectionStateListener actual);
 }
index 395df71..0cc6e0d 100644 (file)
@@ -36,6 +36,7 @@ import org.apache.curator.framework.imps.GzipCompressionProvider;
 import org.apache.curator.framework.schema.SchemaSet;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
+import org.apache.curator.framework.state.ConnectionStateListenerDecorator;
 import org.apache.curator.framework.state.StandardConnectionStateErrorPolicy;
 import org.apache.curator.utils.DefaultZookeeperFactory;
 import org.apache.curator.utils.ZookeeperFactory;
@@ -47,6 +48,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
@@ -151,6 +153,7 @@ public class CuratorFrameworkFactory
         private boolean zk34CompatibilityMode = isZK34();
         private int waitForShutdownTimeoutMs = 0;
         private Executor runSafeService = null;
+        private ConnectionStateListenerDecorator connectionStateListenerDecorator = ConnectionStateListenerDecorator.standard;
 
         /**
          * Apply the current values and build a new CuratorFramework
@@ -494,6 +497,23 @@ public class CuratorFrameworkFactory
             return this;
         }
 
+        /**
+         * Sets the connection state listener decorator. Curator recipes (and proper client code)
+         * will always decorate connection state listeners via this decorator. For example,
+         * you can set use {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}s
+         * via this mechanism by using {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator#circuitBreaking(org.apache.curator.RetryPolicy)}
+         * or {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator#circuitBreaking(org.apache.curator.RetryPolicy, java.util.concurrent.ScheduledExecutorService)}
+         *
+         * @param connectionStateListenerDecorator decorator to use
+         * @return this
+         * @since 4.2.0
+         */
+        public Builder connectionStateListenerFactory(ConnectionStateListenerDecorator connectionStateListenerDecorator)
+        {
+            this.connectionStateListenerDecorator = Objects.requireNonNull(connectionStateListenerDecorator, "connectionStateListenerFactory cannot be null");
+            return this;
+        }
+
         public Executor getRunSafeService()
         {
             return runSafeService;
@@ -641,6 +661,11 @@ public class CuratorFrameworkFactory
             return canBeReadOnly;
         }
 
+        public ConnectionStateListenerDecorator getConnectionStateListenerDecorator()
+        {
+            return connectionStateListenerDecorator;
+        }
+
         private Builder()
         {
         }
index 736b737..81cae74 100644 (file)
@@ -41,6 +41,7 @@ import org.apache.curator.framework.schema.SchemaSet;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.framework.state.ConnectionStateListenerDecorator;
 import org.apache.curator.framework.state.ConnectionStateManager;
 import org.apache.curator.utils.DebugUtils;
 import org.apache.curator.utils.EnsurePath;
@@ -90,6 +91,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final SchemaSet schemaSet;
     private final boolean zk34CompatibilityMode;
     private final Executor runSafeService;
+    private final ConnectionStateListenerDecorator connectionStateListenerDecorator;
 
     private volatile ExecutorService executorService;
     private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -147,6 +149,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         connectionStateErrorPolicy = Preconditions.checkNotNull(builder.getConnectionStateErrorPolicy(), "errorPolicy cannot be null");
         schemaSet = Preconditions.checkNotNull(builder.getSchemaSet(), "schemaSet cannot be null");
         zk34CompatibilityMode = builder.isZk34CompatibilityMode();
+        connectionStateListenerDecorator = builder.getConnectionStateListenerDecorator();
 
         byte[] builderDefaultData = builder.getDefaultData();
         defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
@@ -257,6 +260,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         zk34CompatibilityMode = parent.zk34CompatibilityMode;
         ensembleTracker = null;
         runSafeService = parent.runSafeService;
+        connectionStateListenerDecorator = parent.connectionStateListenerDecorator;
     }
 
     @Override
@@ -589,6 +593,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return schemaSet;
     }
 
+    @Override
+    public ConnectionStateListener decorateConnectionStateListener(ConnectionStateListener actual)
+    {
+        return connectionStateListenerDecorator.decorateListener(this, actual);
+    }
+
     ACLProvider getAclProvider()
     {
         return aclProvider;
index fd4497a..60ae501 100644 (file)
@@ -50,7 +50,10 @@ public class ListenerContainer<T> implements Listenable<T>
     @Override
     public void removeListener(T listener)
     {
-        listeners.remove(listener);
+        if ( listener != null )
+        {
+            listeners.remove(listener);
+        }
     }
 
     /**
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
new file mode 100644 (file)
index 0000000..ad48a15
--- /dev/null
@@ -0,0 +1,79 @@
+package org.apache.curator.framework.state;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.RetrySleeper;
+import java.util.Objects;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+// must be guarded by sync
+class CircuitBreaker
+{
+    private final RetryPolicy retryPolicy;
+    private final ScheduledExecutorService service;
+
+    private boolean isOpen = false;
+    private int retryCount = 0;
+    private long openStartNanos = 0;
+
+    CircuitBreaker(RetryPolicy retryPolicy, ScheduledExecutorService service)
+    {
+        this.retryPolicy = Objects.requireNonNull(retryPolicy, "retryPolicy cannot be null");
+        this.service = Objects.requireNonNull(service, "service cannot be null");
+    }
+
+    boolean isOpen()
+    {
+        return isOpen;
+    }
+
+    int getRetryCount()
+    {
+        return retryCount;
+    }
+
+    boolean tryToOpen(Runnable completion)
+    {
+        if ( isOpen )
+        {
+            return false;
+        }
+
+        isOpen = true;
+        retryCount = 0;
+        openStartNanos = System.nanoTime();
+        if ( !tryToRetry(completion) )
+        {
+            close();
+            return false;
+        }
+        return true;
+    }
+
+    boolean tryToRetry(Runnable completion)
+    {
+        if ( !isOpen )
+        {
+            return false;
+        }
+
+        long[] sleepTimeNanos = new long[]{0L};
+        RetrySleeper retrySleeper = (time, unit) -> sleepTimeNanos[0] = unit.toNanos(time);
+        if ( !retryPolicy.allowRetry(retryCount, System.nanoTime() - openStartNanos, retrySleeper) )
+        {
+            return false;
+        }
+        ++retryCount;
+        service.schedule(completion, sleepTimeNanos[0], TimeUnit.NANOSECONDS);
+        return true;
+    }
+
+    boolean close()
+    {
+        boolean wasOpen = isOpen;
+        retryCount = 0;
+        isOpen = false;
+        openStartNanos = 0;
+        return wasOpen;
+    }
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java b/curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
new file mode 100644 (file)
index 0000000..12efad9
--- /dev/null
@@ -0,0 +1,177 @@
+package org.apache.curator.framework.state;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ThreadUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Objects;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * <p>
+ *     A decorator/proxy for connection state listeners that adds circuit breaking behavior. During network
+ *     outages ZooKeeper can become very noisy sending connection/disconnection events in rapid succession.
+ *     Curator recipes respond to these messages by resetting state, etc. E.g. LeaderLatch must delete
+ *     its lock node and try to recreated it in order to try to re-obtain leadership, etc.
+ * </p>
+ *
+ * <p>
+ *     This noisy herding can be avoided by using the circuit breaking listener decorator. When it
+ *     receives {@link org.apache.curator.framework.state.ConnectionState#SUSPENDED}, the circuit
+ *     becomes "open" (based on the provided {@link org.apache.curator.RetryPolicy}) and will ignore
+ *     future connection state changes until RetryPolicy timeout has elapsed. Note: however, if the connection
+ *     goes from {@link org.apache.curator.framework.state.ConnectionState#SUSPENDED} to
+ *     {@link org.apache.curator.framework.state.ConnectionState#LOST} the first LOST state <i>is</i> sent.
+ * </p>
+ *
+ * <p>
+ *     When the circuit decorator is closed, all connection state changes are forwarded to the managed
+ *     listener. When the first disconnected state is received, the circuit becomes open. The state change
+ *     that caused the circuit to open is sent to the managed listener and the RetryPolicy will be used to
+ *     get a delay amount. While the delay is active, the decorator will store state changes but will not
+ *     forward them to the managed listener (except, however, the first time the state changes from SUSPENDED to LOST).
+ *     When the delay elapses, if the connection has been restored, the circuit closes and forwards the
+ *     new state to the managed listener. If the connection has not been restored, the RetryPolicy is checked
+ *     again. If the RetryPolicy indicates another retry is allowed the process repeats. If, however, the
+ *     RetryPolicy indicates that retries are exhausted then the circuit closes - if the current state
+ *     is different than the state that caused the circuit to open it is forwarded to the managed listener.
+ * </p>
+ */
+public class CircuitBreakingConnectionStateListener implements ConnectionStateListener
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final CuratorFramework client;
+    private final ConnectionStateListener listener;
+    private final CircuitBreaker circuitBreaker;
+
+    // guarded by sync
+    private boolean circuitLostHasBeenSent;
+    // guarded by sync
+    private ConnectionState circuitLastState;
+    // guarded by sync
+    private ConnectionState circuitInitialState;
+
+    /**
+     * @param client Curator instance
+     * @param listener listener to manage
+     * @param retryPolicy breaking policy to use
+     */
+    public CircuitBreakingConnectionStateListener(CuratorFramework client, ConnectionStateListener listener, RetryPolicy retryPolicy)
+    {
+        this(client, listener, retryPolicy, ThreadUtils.newSingleThreadScheduledExecutor("CircuitBreakingConnectionStateListener"));
+    }
+
+    /**
+     * @param client Curator instance
+     * @param listener listener to manage
+     * @param retryPolicy breaking policy to use
+     * @param service scheduler to use
+     */
+    public CircuitBreakingConnectionStateListener(CuratorFramework client, ConnectionStateListener listener, RetryPolicy retryPolicy, ScheduledExecutorService service)
+    {
+        this.client = client;
+        this.listener = Objects.requireNonNull(listener, "listener cannot be null");
+        circuitBreaker = new CircuitBreaker(retryPolicy, service);
+        reset();
+    }
+
+    @Override
+    public synchronized void stateChanged(CuratorFramework client, ConnectionState newState)
+    {
+        if ( circuitBreaker.isOpen() )
+        {
+            handleOpenStateChange(newState);
+        }
+        else
+        {
+            handleClosedStateChange(newState);
+        }
+    }
+
+    /**
+     * Returns true if the circuit is open
+     *
+     * @return true/false
+     */
+    public synchronized boolean isOpen()
+    {
+        return circuitBreaker.isOpen();
+    }
+
+    private synchronized void handleClosedStateChange(ConnectionState newState)
+    {
+        if ( !newState.isConnected() )
+        {
+            if ( circuitBreaker.tryToOpen(this::checkCloseCircuit) )
+            {
+                log.info("Circuit is opening. State: {} post-retryCount: {}", newState, circuitBreaker.getRetryCount());
+                circuitLastState = circuitInitialState = newState;
+                circuitLostHasBeenSent = (newState == ConnectionState.LOST);
+            }
+            else
+            {
+                log.debug("Could not open circuit breaker. State: {}", newState);
+            }
+        }
+        callListener(circuitInitialState);
+    }
+
+    private synchronized void handleOpenStateChange(ConnectionState newState)
+    {
+        if ( circuitLostHasBeenSent || (newState != ConnectionState.LOST) )
+        {
+            log.debug("Circuit is open. Ignoring state change: {}", newState);
+            circuitLastState = newState;
+        }
+        else
+        {
+            circuitLostHasBeenSent = true;
+            circuitInitialState = ConnectionState.LOST;
+            circuitLastState = newState;
+            log.debug("Circuit is open. State changed to LOST. Sending to listener.");
+            callListener(circuitInitialState);
+        }
+    }
+
+    private synchronized void checkCloseCircuit()
+    {
+        if ( (circuitLastState == null) || circuitLastState.isConnected() )
+        {
+            log.info("Circuit is closing. Initial state: {} - Last state: {}", circuitInitialState, circuitLastState);
+            closeCircuit();
+        }
+        else if ( circuitBreaker.tryToRetry(this::checkCloseCircuit) )
+        {
+            log.debug("Circuit open is continuing due to retry. State: {} post-retryCount: {}", circuitLastState, circuitBreaker.getRetryCount());
+        }
+        else
+        {
+            log.info("Circuit is closing due to retries exhausted. Initial state: {} - Last state: {}", circuitInitialState, circuitLastState);
+            closeCircuit();
+        }
+    }
+
+    private synchronized void callListener(ConnectionState newState)
+    {
+        if ( newState != null )
+        {
+            listener.stateChanged(client, newState);
+        }
+    }
+
+    private synchronized void closeCircuit()
+    {
+        ConnectionState stateToSend = (circuitLastState == circuitInitialState) ? null : circuitLastState;
+        reset();
+        callListener(stateToSend);
+    }
+
+    private synchronized void reset()
+    {
+        circuitLastState = null;
+        circuitInitialState = null;
+        circuitLostHasBeenSent = false;
+        circuitBreaker.close();
+    }
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java
new file mode 100644 (file)
index 0000000..0f11c46
--- /dev/null
@@ -0,0 +1,63 @@
+package org.apache.curator.framework.state;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * <p>
+ *     Allows for the enhancement of the {@link org.apache.curator.framework.state.ConnectionStateListener} instances
+ *     used with Curator. Client code that sets a ConnectionStateListener should always wrap it using the configured
+ *     ConnectionStateListenerDecorator. All Curator recipes do this.
+ * </p>
+ *
+ * <p>
+ *     E.g.
+ *
+ * <code><pre>
+ * CuratorFramework client ...
+ * ConnectionStateListener listener = ...
+ * ConnectionStateListener wrappedListener = client.wrapConnectionStateListener(listener);
+ *
+ * ...
+ *
+ * client.getConnectionStateListenable().addListener(wrappedListener);
+ *
+ * // later, to remove...
+ * client.getConnectionStateListenable().removeListener(wrappedListener);
+ * </pre></code>
+ * </p>
+ */
+@FunctionalInterface
+public interface ConnectionStateListenerDecorator
+{
+    ConnectionStateListener decorateListener(CuratorFramework client, ConnectionStateListener actual);
+
+    /**
+     * Pass through - does no decoration
+     */
+    ConnectionStateListenerDecorator standard = (__, actual) -> actual;
+
+    /**
+     * Decorates the listener with circuit breaking behavior using {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}
+     *
+     * @param retryPolicy the circuit breaking policy to use
+     * @return new decorator
+     */
+    static ConnectionStateListenerDecorator circuitBreaking(RetryPolicy retryPolicy)
+    {
+        return (client, actual) -> new CircuitBreakingConnectionStateListener(client, actual, retryPolicy);
+    }
+
+    /**
+     * Decorates the listener with circuit breaking behavior using {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}
+     *
+     * @param retryPolicy the circuit breaking policy to use
+     * @param service the scheduler to use
+     * @return new decorator
+     */
+    static ConnectionStateListenerDecorator circuitBreaking(RetryPolicy retryPolicy, ScheduledExecutorService service)
+    {
+        return (client, actual) -> new CircuitBreakingConnectionStateListener(client, actual, retryPolicy, service);
+    }
+}
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
new file mode 100644 (file)
index 0000000..77ec20f
--- /dev/null
@@ -0,0 +1,49 @@
+package org.apache.curator.framework.state;
+
+import org.apache.curator.retry.RetryNTimes;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestCircuitBreaker
+{
+    @Test
+    public void testBasic()
+    {
+        final int retryQty = 1;
+        final Duration delay = Duration.ofSeconds(10);
+
+        Duration[] lastDelay = new Duration[]{Duration.ZERO};
+        ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(1)
+        {
+            @Override
+            public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+            {
+                lastDelay[0] = Duration.of(unit.toNanos(delay), ChronoUnit.NANOS);
+                command.run();
+                return null;
+            }
+        };
+        CircuitBreaker circuitBreaker = new CircuitBreaker(new RetryNTimes(retryQty, (int)delay.toMillis()), service);
+        AtomicInteger counter = new AtomicInteger(0);
+
+        Assert.assertTrue(circuitBreaker.tryToOpen(counter::incrementAndGet));
+        Assert.assertEquals(lastDelay[0], delay);
+
+        Assert.assertFalse(circuitBreaker.tryToOpen(counter::incrementAndGet));
+        Assert.assertEquals(circuitBreaker.getRetryCount(), 1);
+        Assert.assertEquals(counter.get(), 1);
+        Assert.assertFalse(circuitBreaker.tryToRetry(counter::incrementAndGet));
+        Assert.assertEquals(circuitBreaker.getRetryCount(), 1);
+        Assert.assertEquals(counter.get(), 1);
+
+        Assert.assertTrue(circuitBreaker.close());
+        Assert.assertEquals(circuitBreaker.getRetryCount(), 0);
+        Assert.assertFalse(circuitBreaker.close());
+    }
+}
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java b/curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java
new file mode 100644 (file)
index 0000000..36a1954
--- /dev/null
@@ -0,0 +1,137 @@
+package org.apache.curator.framework.state;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.RetrySleeper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryForever;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.compatibility.Timing2;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+public class TestCircuitBreakingConnectionStateListener
+{
+    private final CuratorFramework dummyClient = CuratorFrameworkFactory.newClient("foo", new RetryOneTime(1));
+    private final Timing2 timing = new Timing2();
+    private final Timing2 retryTiming = timing.multiple(.25);
+    private final ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(1);
+
+    private static class RecordingListener implements ConnectionStateListener
+    {
+        final BlockingQueue<ConnectionState> stateChanges = new LinkedBlockingQueue<>();
+
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            stateChanges.offer(newState);
+        }
+    }
+
+    private class TestRetryPolicy extends RetryForever
+    {
+        volatile boolean isRetrying = true;
+
+        public TestRetryPolicy()
+        {
+            super(retryTiming.milliseconds());
+        }
+
+        @Override
+        public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
+        {
+            return isRetrying && super.allowRetry(retryCount, elapsedTimeMs, sleeper);
+        }
+    }
+
+    @AfterClass
+    public void tearDown()
+    {
+        service.shutdownNow();
+    }
+
+    @Test
+    public void testBasic() throws Exception
+    {
+        RecordingListener recordingListener = new RecordingListener();
+        TestRetryPolicy retryPolicy = new TestRetryPolicy();
+        final CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryPolicy, service);
+
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+        Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.RECONNECTED);
+
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+        Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.SUSPENDED);
+        listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);  // 2nd suspended is ignored
+        Assert.assertTrue(recordingListener.stateChanges.isEmpty());
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST);
+
+        synchronized(listener)  // don't let retry policy run while we're pushing state changes
+        {
+            listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);   // all further events are ignored
+            listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);   // all further events are ignored
+            listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);   // all further events are ignored
+            listener.stateChanged(dummyClient, ConnectionState.LOST);   // all further events are ignored
+            listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);   // all further events are ignored - this will be the last event
+        }
+        retryTiming.multiple(2).sleep();
+        Assert.assertTrue(recordingListener.stateChanges.isEmpty());
+
+        retryPolicy.isRetrying = false; // retry policy will return false
+        Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.SUSPENDED);
+    }
+
+    @Test
+    public void testResetsAfterReconnect() throws Exception
+    {
+        RecordingListener recordingListener = new RecordingListener();
+        TestRetryPolicy retryPolicy = new TestRetryPolicy();
+        CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryPolicy, service);
+
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        listener.stateChanged(dummyClient, ConnectionState.LOST);   // second LOST ignored
+        Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST);
+        Assert.assertTrue(recordingListener.stateChanges.isEmpty());
+
+        listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);   // causes circuit to close on next retry
+        Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.RECONNECTED);
+    }
+
+    @Test
+    public void testRetryNever() throws Exception
+    {
+        RecordingListener recordingListener = new RecordingListener();
+        RetryPolicy retryNever = (retryCount, elapsedTimeMs, sleeper) -> false;
+        CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryNever, service);
+
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST);
+        listener.stateChanged(dummyClient, ConnectionState.LOST);
+        Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST);
+        Assert.assertFalse(listener.isOpen());
+    }
+
+    @Test
+    public void testRetryOnce() throws Exception
+    {
+        RecordingListener recordingListener = new RecordingListener();
+        RetryPolicy retryOnce = new RetryOneTime(retryTiming.milliseconds());
+        final CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryOnce, service);
+
+        synchronized(listener)  // don't let retry policy run while we're pushing state changes
+        {
+            listener.stateChanged(dummyClient, ConnectionState.LOST);
+            listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+            Assert.assertTrue(listener.isOpen());
+        }
+        Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST);
+        Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.SUSPENDED);
+        Assert.assertFalse(listener.isOpen());
+    }
+}
index 9687e1b..1ba88c3 100644 (file)
@@ -64,32 +64,7 @@ public class NodeCache implements Closeable
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
     private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
     private final AtomicBoolean isConnected = new AtomicBoolean(true);
-    private ConnectionStateListener connectionStateListener = new ConnectionStateListener()
-    {
-        @Override
-        public void stateChanged(CuratorFramework client, ConnectionState newState)
-        {
-            if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
-            {
-                if ( isConnected.compareAndSet(false, true) )
-                {
-                    try
-                    {
-                        reset();
-                    }
-                    catch ( Exception e )
-                    {
-                        ThreadUtils.checkInterrupted(e);
-                        log.error("Trying to reset after reconnection", e);
-                    }
-                }
-            }
-            else
-            {
-                isConnected.set(false);
-            }
-        }
-    };
+    private volatile ConnectionStateListener connectionStateListener;
 
     private Watcher watcher = new Watcher()
     {
@@ -143,6 +118,8 @@ public class NodeCache implements Closeable
         this.client = client.newWatcherRemoveCuratorFramework();
         this.path = PathUtils.validatePath(path);
         this.dataIsCompressed = dataIsCompressed;
+
+        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     public CuratorFramework getClient()
@@ -196,7 +173,7 @@ public class NodeCache implements Closeable
             // has something to do with Guava's cache and circular references
             connectionStateListener = null;
             watcher = null;
-        }        
+        }
     }
 
     /**
@@ -348,7 +325,7 @@ public class NodeCache implements Closeable
             }
         }
     }
-    
+
     /**
      * Default behavior is just to log the exception
      *
@@ -358,4 +335,27 @@ public class NodeCache implements Closeable
     {
         log.error("", e);
     }
+
+    private void handleStateChange(ConnectionState newState)
+    {
+        if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
+        {
+            if ( isConnected.compareAndSet(false, true) )
+            {
+                try
+                {
+                    reset();
+                }
+                catch ( Exception e )
+                {
+                    ThreadUtils.checkInterrupted(e);
+                    log.error("Trying to reset after reconnection", e);
+                }
+            }
+        }
+        else
+        {
+            isConnected.set(false);
+        }
+    }
 }
index bdc73cc..14608ba 100644 (file)
@@ -128,14 +128,7 @@ public class PathChildrenCache implements Closeable
     @VisibleForTesting
     volatile Exchanger<Object> rebuildTestExchanger;
 
-    private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener()
-    {
-        @Override
-        public void stateChanged(CuratorFramework client, ConnectionState newState)
-        {
-            handleStateChange(newState);
-        }
-    };
+    private volatile ConnectionStateListener connectionStateListener;
     public static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache");
 
     /**
@@ -225,6 +218,7 @@ public class PathChildrenCache implements Closeable
         this.dataIsCompressed = dataIsCompressed;
         this.executorService = executorService;
         ensureContainers = new EnsureContainers(client, path);
+        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     /**
index f42c1d5..a3b1d23 100644 (file)
@@ -534,15 +534,7 @@ public class TreeCache implements Closeable
     private final ListenerContainer<TreeCacheListener> listeners = new ListenerContainer<TreeCacheListener>();
     private final ListenerContainer<UnhandledErrorListener> errorListeners = new ListenerContainer<UnhandledErrorListener>();
     private final AtomicReference<TreeState> treeState = new AtomicReference<TreeState>(TreeState.LATENT);
-
-    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
-    {
-        @Override
-        public void stateChanged(CuratorFramework client, ConnectionState newState)
-        {
-            handleStateChange(newState);
-        }
-    };
+    private final ConnectionStateListener connectionStateListener;
 
     static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("TreeCache");
 
@@ -586,6 +578,7 @@ public class TreeCache implements Closeable
         this.maxDepth = maxDepth;
         this.disableZkWatches = disableZkWatches;
         this.executorService = Preconditions.checkNotNull(executorService, "executorService cannot be null");
+        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     /**
index bb8aa73..22cf3af 100644 (file)
@@ -74,15 +74,7 @@ public class LeaderLatch implements Closeable
     private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
     private final CloseMode closeMode;
     private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>();
-
-    private final ConnectionStateListener listener = new ConnectionStateListener()
-    {
-        @Override
-        public void stateChanged(CuratorFramework client, ConnectionState newState)
-        {
-            handleStateChange(newState);
-        }
-    };
+    private final ConnectionStateListener listener;
 
     private static final String LOCK_NAME = "latch-";
 
@@ -149,6 +141,7 @@ public class LeaderLatch implements Closeable
         this.latchPath = PathUtils.validatePath(latchPath);
         this.id = Preconditions.checkNotNull(id, "id cannot be null");
         this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
+        listener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     /**
index 0bb448a..108d66e 100644 (file)
@@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.zookeeper.KeeperException;
@@ -68,6 +69,7 @@ public class LeaderSelector implements Closeable
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final CuratorFramework client;
     private final LeaderSelectorListener listener;
+    private final ConnectionStateListener connectionStateListener;
     private final CloseableExecutorService executorService;
     private final InterProcessMutex mutex;
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
@@ -145,6 +147,8 @@ public class LeaderSelector implements Closeable
         this.listener = new WrappedListener(this, listener);
         hasLeadership = false;
 
+        connectionStateListener = client.decorateConnectionStateListener(listener);
+
         this.executorService = executorService;
         mutex = new InterProcessMutex(client, leaderPath)
         {
@@ -215,7 +219,7 @@ public class LeaderSelector implements Closeable
         Preconditions.checkState(!executorService.isShutdown(), "Already started");
         Preconditions.checkState(!hasLeadership, "Already has leadership");
 
-        client.getConnectionStateListenable().addListener(listener);
+        client.getConnectionStateListenable().addListener(connectionStateListener);
         requeue();
     }
 
@@ -271,7 +275,7 @@ public class LeaderSelector implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
 
-        client.getConnectionStateListenable().removeListener(listener);
+        client.getConnectionStateListenable().removeListener(connectionStateListener);
         executorService.close();
         ourTask.set(null);
     }
index 81e8dd9..293f46e 100644 (file)
@@ -145,17 +145,7 @@ public class PersistentNode implements Closeable
             }
         }
     };
-    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
-    {
-        @Override
-        public void stateChanged(CuratorFramework dummy, ConnectionState newState)
-        {
-            if ( (newState == ConnectionState.RECONNECTED) && isActive() )
-            {
-                createNode();
-            }
-        }
-    };
+    private final ConnectionStateListener connectionStateListener;
 
     @VisibleForTesting
     volatile CountDownLatch debugCreateNodeLatch = null;
@@ -213,6 +203,7 @@ public class PersistentNode implements Closeable
         };
 
         this.data.set(Arrays.copyOf(data, data.length));
+        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     private void processBackgroundCallbackClosedState(CuratorEvent event)
@@ -554,4 +545,12 @@ public class PersistentNode implements Closeable
     {
         return authFailure.get();
     }
+
+    private void handleStateChange(ConnectionState newState)
+    {
+        if ( (newState == ConnectionState.RECONNECTED) && isActive() )
+        {
+            createNode();
+        }
+    }
 }
index 5d7abce..5f3e183 100644 (file)
@@ -73,26 +73,7 @@ public class SharedValue implements Closeable, SharedValueReader
         }
     };
 
-    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
-    {
-        @Override
-        public void stateChanged(CuratorFramework client, ConnectionState newState)
-        {
-            notifyListenerOfStateChanged(newState);
-            if ( newState.isConnected() )
-            {
-                try
-                {
-                    readValueAndNotifyListenersInBackground();
-                }
-                catch ( Exception e )
-                {
-                    ThreadUtils.checkInterrupted(e);
-                    log.error("Could not read value after reconnect", e);
-                }
-            }
-        }
-    };
+    private final ConnectionStateListener connectionStateListener;
 
     private enum State
     {
@@ -113,6 +94,7 @@ public class SharedValue implements Closeable, SharedValueReader
         this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
         this.watcher = new SharedValueCuratorWatcher();
         currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
+        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     @VisibleForTesting
@@ -124,6 +106,7 @@ public class SharedValue implements Closeable, SharedValueReader
         // inject watcher for testing
         this.watcher = watcher;
         currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
+        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     @Override
@@ -334,4 +317,21 @@ public class SharedValue implements Closeable, SharedValueReader
                 }
             );
     }
+
+    private void handleStateChange(ConnectionState newState)
+    {
+        notifyListenerOfStateChanged(newState);
+        if ( newState.isConnected() )
+        {
+            try
+            {
+                readValueAndNotifyListenersInBackground();
+            }
+            catch ( Exception e )
+            {
+                ThreadUtils.checkInterrupted(e);
+                log.error("Could not read value after reconnect", e);
+            }
+        }
+    }
 }
index 011e4a0..439f6c8 100644 (file)
@@ -28,8 +28,10 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.framework.state.ConnectionStateListenerDecorator;
 import org.apache.curator.framework.state.SessionConnectionStateErrorPolicy;
 import org.apache.curator.framework.state.StandardConnectionStateErrorPolicy;
+import org.apache.curator.retry.RetryForever;
 import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
@@ -59,6 +61,44 @@ public class TestLeaderLatch extends BaseClassForTests
     private static final int MAX_LOOPS = 5;
 
     @Test
+    public void testWithCircuitBreaker() throws Exception
+    {
+        Timing2 timing = new Timing2();
+        ConnectionStateListenerDecorator factory = ConnectionStateListenerDecorator.circuitBreaking(new RetryForever(timing.milliseconds()));
+        try ( CuratorFramework client = CuratorFrameworkFactory.builder()
+            .connectString(server.getConnectString())
+            .retryPolicy(new RetryOneTime(1))
+            .connectionStateListenerFactory(factory)
+            .connectionTimeoutMs(timing.connection())
+            .sessionTimeoutMs(timing.session())
+            .build() )
+        {
+            client.start();
+            AtomicInteger resetCount = new AtomicInteger(0);
+            LeaderLatch latch = new LeaderLatch(client, "/foo/bar")
+            {
+                @Override
+                void reset() throws Exception
+                {
+                    resetCount.incrementAndGet();
+                    super.reset();
+                }
+            };
+            latch.start();
+            Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+
+            for ( int i = 0; i < 5; ++i )
+            {
+                server.stop();
+                server.restart();
+                timing.sleepABit();
+            }
+            Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+            Assert.assertEquals(resetCount.get(), 2);
+        }
+    }
+
+    @Test
     public void testUncreatedPathGetLeader() throws Exception
     {
         try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
index d1a31ad..4270116 100644 (file)
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.x.discovery.details;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -23,14 +24,15 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.curator.utils.CloseableExecutorService;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.CloseableExecutorService;
+import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceInstance;
@@ -45,17 +47,16 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheListener
 {
-    private final ListenerContainer<ServiceCacheListener>           listenerContainer = new ListenerContainer<ServiceCacheListener>();
-    private final ServiceDiscoveryImpl<T>                           discovery;
-    private final AtomicReference<State>                            state = new AtomicReference<State>(State.LATENT);
-    private final PathChildrenCache                                 cache;
-    private final ConcurrentMap<String, ServiceInstance<T>>         instances = Maps.newConcurrentMap();
+    private final ListenerContainer<ServiceCacheListener> listenerContainer = new ListenerContainer<ServiceCacheListener>();
+    private final ServiceDiscoveryImpl<T> discovery;
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+    private final PathChildrenCache cache;
+    private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap();
+    private final ConcurrentMap<ServiceCacheListener, ConnectionStateListener> connectionStateListeners = Maps.newConcurrentMap();
 
     private enum State
     {
-        LATENT,
-        STARTED,
-        STOPPED
+        LATENT, STARTED, STOPPED
     }
 
     private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory)
@@ -123,18 +124,15 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, State.STOPPED), "Already closed or has not been started");
 
-        listenerContainer.forEach
-            (
-                new Function<ServiceCacheListener, Void>()
-                {
-                    @Override
-                    public Void apply(ServiceCacheListener listener)
-                    {
-                        discovery.getClient().getConnectionStateListenable().removeListener(listener);
-                        return null;
-                    }
-                }
-            );
+        listenerContainer.forEach(new Function<ServiceCacheListener, Void>()
+        {
+            @Override
+            public Void apply(ServiceCacheListener listener)
+            {
+                discovery.getClient().getConnectionStateListenable().removeListener(unwrap(listener));
+                return null;
+            }
+        });
         listenerContainer.clear();
 
         CloseableUtils.closeQuietly(cache);
@@ -146,59 +144,56 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     public void addListener(ServiceCacheListener listener)
     {
         listenerContainer.addListener(listener);
-        discovery.getClient().getConnectionStateListenable().addListener(listener);
+        discovery.getClient().getConnectionStateListenable().addListener(wrap(listener));
     }
 
     @Override
     public void addListener(ServiceCacheListener listener, Executor executor)
     {
         listenerContainer.addListener(listener, executor);
-        discovery.getClient().getConnectionStateListenable().addListener(listener, executor);
+        discovery.getClient().getConnectionStateListenable().addListener(wrap(listener), executor);
     }
 
     @Override
     public void removeListener(ServiceCacheListener listener)
     {
         listenerContainer.removeListener(listener);
-        discovery.getClient().getConnectionStateListenable().removeListener(listener);
+        discovery.getClient().getConnectionStateListenable().removeListener(unwrap(listener));
     }
 
     @Override
     public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
     {
-        boolean         notifyListeners = false;
+        boolean notifyListeners = false;
         switch ( event.getType() )
         {
-            case CHILD_ADDED:
-            case CHILD_UPDATED:
-            {
-                addInstance(event.getData(), false);
-                notifyListeners = true;
-                break;
-            }
+        case CHILD_ADDED:
+        case CHILD_UPDATED:
+        {
+            addInstance(event.getData(), false);
+            notifyListeners = true;
+            break;
+        }
 
-            case CHILD_REMOVED:
-            {
-                instances.remove(instanceIdFromData(event.getData()));
-                notifyListeners = true;
-                break;
-            }
+        case CHILD_REMOVED:
+        {
+            instances.remove(instanceIdFromData(event.getData()));
+            notifyListeners = true;
+            break;
+        }
         }
 
         if ( notifyListeners )
         {
-            listenerContainer.forEach
-            (
-                new Function<ServiceCacheListener, Void>()
+            listenerContainer.forEach(new Function<ServiceCacheListener, Void>()
+            {
+                @Override
+                public Void apply(ServiceCacheListener listener)
                 {
-                    @Override
-                    public Void apply(ServiceCacheListener listener)
-                    {
-                        listener.cacheChanged();
-                        return null;
-                    }
+                    listener.cacheChanged();
+                    return null;
                 }
-            );
+            });
         }
     }
 
@@ -209,8 +204,8 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
 
     private void addInstance(ChildData childData, boolean onlyIfAbsent) throws Exception
     {
-        String                  instanceId = instanceIdFromData(childData);
-        ServiceInstance<T>      serviceInstance = discovery.getSerializer().deserialize(childData.getData());
+        String instanceId = instanceIdFromData(childData);
+        ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData());
         if ( onlyIfAbsent )
         {
             instances.putIfAbsent(instanceId, serviceInstance);
@@ -221,4 +216,16 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
         }
         cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion());
     }
+
+    private ConnectionStateListener wrap(ServiceCacheListener listener)
+    {
+        ConnectionStateListener wrapped = discovery.getClient().decorateConnectionStateListener(listener);
+        connectionStateListeners.put(listener, wrapped);
+        return wrapped;
+    }
+
+    private ConnectionStateListener unwrap(ServiceCacheListener listener)
+    {
+        return connectionStateListeners.remove(listener);
+    }
 }
index 476705c..2e10095 100644 (file)
@@ -65,29 +65,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     private final Collection<ServiceCache<T>> caches = Sets.newSetFromMap(Maps.<ServiceCache<T>, Boolean>newConcurrentMap());
     private final Collection<ServiceProvider<T>> providers = Sets.newSetFromMap(Maps.<ServiceProvider<T>, Boolean>newConcurrentMap());
     private final boolean watchInstances;
-    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
-    {
-        @Override
-        public void stateChanged(CuratorFramework client, ConnectionState newState)
-        {
-            if ( (newState == ConnectionState.RECONNECTED) || (newState == ConnectionState.CONNECTED) )
-            {
-                try
-                {
-                    log.debug("Re-registering due to reconnection");
-                    reRegisterServices();
-                }
-                catch (InterruptedException ex)
-                {
-                    Thread.currentThread().interrupt();
-                }
-                catch ( Exception e )
-                {
-                    log.error("Could not re-register instances after reconnection", e);
-                }
-            }
-        }
-    };
+    private final ConnectionStateListener connectionStateListener;
 
     private static class Entry<T>
     {
@@ -119,6 +97,7 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
             entry.cache = makeNodeCache(thisInstance);
             services.put(thisInstance.getId(), entry);
         }
+        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     /**
@@ -530,4 +509,24 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
             }
         }
     }
+
+    private void handleStateChange(ConnectionState newState)
+    {
+        if ( (newState == ConnectionState.RECONNECTED) || (newState == ConnectionState.CONNECTED) )
+        {
+            try
+            {
+                log.debug("Re-registering due to reconnection");
+                reRegisterServices();
+            }
+            catch (InterruptedException ex)
+            {
+                Thread.currentThread().interrupt();
+            }
+            catch ( Exception e )
+            {
+                log.error("Could not re-register instances after reconnection", e);
+            }
+        }
+    }
 }