import org.apache.curator.framework.schema.SchemaSet;
import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.framework.state.ConnectionStateListenerDecorator;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
* @return decorated listener
*/
ConnectionStateListener decorateConnectionStateListener(ConnectionStateListener actual);
+
+ /**
+ * Returns a facade of the current instance that uses the given connection state listener
+ * decorator instead of the configured one
+ *
+ * @param newDecorator decorator to use
+ * @return facade
+ */
+ CuratorFramework usingConnectionStateListenerDecorator(ConnectionStateListenerDecorator newDecorator);
}
* @return this
* @since 4.2.0
*/
- public Builder connectionStateListenerFactory(ConnectionStateListenerDecorator connectionStateListenerDecorator)
+ public Builder connectionStateListenerDecorator(ConnectionStateListenerDecorator connectionStateListenerDecorator)
{
this.connectionStateListenerDecorator = Objects.requireNonNull(connectionStateListenerDecorator, "connectionStateListenerFactory cannot be null");
return this;
protected CuratorFrameworkImpl(CuratorFrameworkImpl parent)
{
+ this(parent, parent.connectionStateListenerDecorator);
+ }
+
+ private CuratorFrameworkImpl(CuratorFrameworkImpl parent, ConnectionStateListenerDecorator connectionStateListenerDecorator)
+ {
client = parent.client;
listeners = parent.listeners;
unhandledErrorListeners = parent.unhandledErrorListeners;
zk34CompatibilityMode = parent.zk34CompatibilityMode;
ensembleTracker = null;
runSafeService = parent.runSafeService;
- connectionStateListenerDecorator = parent.connectionStateListenerDecorator;
+ this.connectionStateListenerDecorator = connectionStateListenerDecorator;
}
@Override
return connectionStateListenerDecorator.decorateListener(this, actual);
}
+ @Override
+ public CuratorFramework usingConnectionStateListenerDecorator(ConnectionStateListenerDecorator newDecorator)
+ {
+ return new CuratorFrameworkImpl(this, newDecorator);
+ }
+
ACLProvider getAclProvider()
{
return aclProvider;
private boolean isOpen = false;
private int retryCount = 0;
- private long openStartNanos = 0;
+ private long startNanos = 0;
CircuitBreaker(RetryPolicy retryPolicy, ScheduledExecutorService service)
{
isOpen = true;
retryCount = 0;
- openStartNanos = System.nanoTime();
- if ( !tryToRetry(completion) )
+ startNanos = System.nanoTime();
+ if ( tryToRetry(completion) )
{
- close();
- return false;
+ return true;
}
- return true;
+ close();
+ return false;
}
boolean tryToRetry(Runnable completion)
long[] sleepTimeNanos = new long[]{0L};
RetrySleeper retrySleeper = (time, unit) -> sleepTimeNanos[0] = unit.toNanos(time);
- if ( !retryPolicy.allowRetry(retryCount, System.nanoTime() - openStartNanos, retrySleeper) )
+ if ( retryPolicy.allowRetry(retryCount, System.nanoTime() - startNanos, retrySleeper) )
{
- return false;
+ ++retryCount;
+ service.schedule(completion, sleepTimeNanos[0], TimeUnit.NANOSECONDS);
+ return true;
}
- ++retryCount;
- service.schedule(completion, sleepTimeNanos[0], TimeUnit.NANOSECONDS);
- return true;
+ return false;
}
boolean close()
boolean wasOpen = isOpen;
retryCount = 0;
isOpen = false;
- openStartNanos = 0;
+ startNanos = 0;
return wasOpen;
}
}
* A decorator/proxy for connection state listeners that adds circuit breaking behavior. During network
* outages ZooKeeper can become very noisy sending connection/disconnection events in rapid succession.
* Curator recipes respond to these messages by resetting state, etc. E.g. LeaderLatch must delete
- * its lock node and try to recreated it in order to try to re-obtain leadership, etc.
+ * its lock node and try to recreate it in order to try to re-obtain leadership, etc.
* </p>
*
* <p>
log.debug("Could not open circuit breaker. State: {}", newState);
}
}
- callListener(circuitInitialState);
+ callListener(newState);
}
private synchronized void handleOpenStateChange(ConnectionState newState)
}
else
{
- circuitLostHasBeenSent = true;
- circuitInitialState = ConnectionState.LOST;
- circuitLastState = newState;
log.debug("Circuit is open. State changed to LOST. Sending to listener.");
- callListener(circuitInitialState);
+ circuitLostHasBeenSent = true;
+ circuitLastState = circuitInitialState = ConnectionState.LOST;
+ callListener(ConnectionState.LOST);
}
}
* <code><pre>
* CuratorFramework client ...
* ConnectionStateListener listener = ...
- * ConnectionStateListener wrappedListener = client.wrapConnectionStateListener(listener);
+ * ConnectionStateListener decoratedListener = client.decorateConnectionStateListener(listener);
*
* ...
*
- * client.getConnectionStateListenable().addListener(wrappedListener);
+ * client.getConnectionStateListenable().addListener(decoratedListener);
*
* // later, to remove...
- * client.getConnectionStateListenable().removeListener(wrappedListener);
+ * client.getConnectionStateListenable().removeListener(decoratedListener);
* </pre></code>
* </p>
*/
package org.apache.curator.framework.state;
+import org.apache.curator.retry.RetryForever;
import org.apache.curator.retry.RetryNTimes;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
public class TestCircuitBreaker
{
+ private Duration[] lastDelay = new Duration[]{Duration.ZERO};
+ private ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(1)
+ {
+ @Override
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+ {
+ lastDelay[0] = Duration.of(unit.toNanos(delay), ChronoUnit.NANOS);
+ command.run();
+ return null;
+ }
+ };
+
+ @AfterClass
+ public void tearDown()
+ {
+ service.shutdownNow();
+ }
+
@Test
public void testBasic()
{
final int retryQty = 1;
final Duration delay = Duration.ofSeconds(10);
- Duration[] lastDelay = new Duration[]{Duration.ZERO};
- ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(1)
- {
- @Override
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
- {
- lastDelay[0] = Duration.of(unit.toNanos(delay), ChronoUnit.NANOS);
- command.run();
- return null;
- }
- };
CircuitBreaker circuitBreaker = new CircuitBreaker(new RetryNTimes(retryQty, (int)delay.toMillis()), service);
AtomicInteger counter = new AtomicInteger(0);
Assert.assertEquals(circuitBreaker.getRetryCount(), 0);
Assert.assertFalse(circuitBreaker.close());
}
+
+ @Test
+ public void testVariousOpenRetryFails()
+ {
+ CircuitBreaker circuitBreaker = new CircuitBreaker(new RetryForever(1), service);
+ Assert.assertFalse(circuitBreaker.tryToRetry(() -> {}));
+ Assert.assertTrue(circuitBreaker.tryToOpen(() -> {}));
+ Assert.assertFalse(circuitBreaker.tryToOpen(() -> {}));
+ Assert.assertTrue(circuitBreaker.close());
+ Assert.assertFalse(circuitBreaker.close());
+ }
}
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.compatibility.Timing2;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
private final CuratorFramework dummyClient = CuratorFrameworkFactory.newClient("foo", new RetryOneTime(1));
private final Timing2 timing = new Timing2();
private final Timing2 retryTiming = timing.multiple(.25);
- private final ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(1);
+ private volatile ScheduledThreadPoolExecutor service;
private static class RecordingListener implements ConnectionStateListener
{
}
}
- @AfterClass
+ @BeforeMethod
+ public void setup()
+ {
+ service = new ScheduledThreadPoolExecutor(1);
+ }
+
+ @AfterMethod
public void tearDown()
{
service.shutdownNow();
{
RecordingListener recordingListener = new RecordingListener();
TestRetryPolicy retryPolicy = new TestRetryPolicy();
- final CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryPolicy, service);
+ CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryPolicy, service);
listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.RECONNECTED);
TestRetryPolicy retryPolicy = new TestRetryPolicy();
CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryPolicy, service);
- listener.stateChanged(dummyClient, ConnectionState.LOST);
- listener.stateChanged(dummyClient, ConnectionState.LOST); // second LOST ignored
+ synchronized(listener) // don't let retry policy run while we're pushing state changes
+ {
+ listener.stateChanged(dummyClient, ConnectionState.LOST);
+ listener.stateChanged(dummyClient, ConnectionState.LOST); // second LOST ignored
+ }
Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST);
Assert.assertTrue(recordingListener.stateChanges.isEmpty());
listener.stateChanged(dummyClient, ConnectionState.LOST);
Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST);
+ Assert.assertFalse(listener.isOpen());
listener.stateChanged(dummyClient, ConnectionState.LOST);
Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST);
Assert.assertFalse(listener.isOpen());
{
RecordingListener recordingListener = new RecordingListener();
RetryPolicy retryOnce = new RetryOneTime(retryTiming.milliseconds());
- final CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryOnce, service);
+ CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryOnce, service);
synchronized(listener) // don't let retry policy run while we're pushing state changes
{
Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.SUSPENDED);
Assert.assertFalse(listener.isOpen());
}
+
+ @Test
+ public void testSuspendedToLostRatcheting() throws Exception
+ {
+ RecordingListener recordingListener = new RecordingListener();
+ RetryPolicy retryInfinite = new RetryForever(Integer.MAX_VALUE);
+ CircuitBreakingConnectionStateListener listener = new CircuitBreakingConnectionStateListener(dummyClient, recordingListener, retryInfinite, service);
+
+ listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+ Assert.assertFalse(listener.isOpen());
+ Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.RECONNECTED);
+
+ listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+ Assert.assertTrue(listener.isOpen());
+ Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.SUSPENDED);
+
+ listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+ listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+ listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+ listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+ listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+ listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+ listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+ listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+ listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+ Assert.assertTrue(recordingListener.stateChanges.isEmpty());
+ Assert.assertTrue(listener.isOpen());
+
+ listener.stateChanged(dummyClient, ConnectionState.LOST);
+ Assert.assertEquals(timing.takeFromQueue(recordingListener.stateChanges), ConnectionState.LOST);
+ Assert.assertTrue(listener.isOpen());
+
+ listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+ listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+ listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+ listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+ listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+ listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+ listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+ listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+ listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+ Assert.assertTrue(recordingListener.stateChanges.isEmpty());
+ Assert.assertTrue(listener.isOpen());
+
+ listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+ listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+ listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+ listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+ listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+ listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+ listener.stateChanged(dummyClient, ConnectionState.LOST);
+ listener.stateChanged(dummyClient, ConnectionState.LOST);
+ listener.stateChanged(dummyClient, ConnectionState.LOST);
+ listener.stateChanged(dummyClient, ConnectionState.RECONNECTED);
+ listener.stateChanged(dummyClient, ConnectionState.READ_ONLY);
+ listener.stateChanged(dummyClient, ConnectionState.LOST);
+ listener.stateChanged(dummyClient, ConnectionState.SUSPENDED);
+ listener.stateChanged(dummyClient, ConnectionState.LOST);
+ Assert.assertTrue(recordingListener.stateChanges.isEmpty());
+ Assert.assertTrue(listener.isOpen());
+ }
}
public void testWithCircuitBreaker() throws Exception
{
Timing2 timing = new Timing2();
- ConnectionStateListenerDecorator factory = ConnectionStateListenerDecorator.circuitBreaking(new RetryForever(timing.milliseconds()));
+ ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(new RetryForever(timing.multiple(2).milliseconds()));
try ( CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.retryPolicy(new RetryOneTime(1))
- .connectionStateListenerFactory(factory)
+ .connectionStateListenerDecorator(decorator)
.connectionTimeoutMs(timing.connection())
.sessionTimeoutMs(timing.session())
.build() )
{
client.start();
AtomicInteger resetCount = new AtomicInteger(0);
- LeaderLatch latch = new LeaderLatch(client, "/foo/bar")
+ try ( LeaderLatch latch = new LeaderLatch(client, "/foo/bar")
{
@Override
void reset() throws Exception
resetCount.incrementAndGet();
super.reset();
}
- };
- latch.start();
- Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
-
- for ( int i = 0; i < 5; ++i )
+ } )
{
- server.stop();
- server.restart();
- timing.sleepABit();
+ latch.start();
+ Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+
+ for ( int i = 0; i < 5; ++i )
+ {
+ server.stop();
+ server.restart();
+ timing.sleepABit();
+ }
+ Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
+ Assert.assertEquals(resetCount.get(), 2);
}
- Assert.assertTrue(latch.await(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
- Assert.assertEquals(resetCount.get(), 2);
}
}
h2. Notifications
Curator exposes several listenable interfaces for clients to monitor the state of the ZooKeeper connection.
-{{ConnectionStateListener}} is called when there are connection disruptions. Clients can monitor these changes and take
+{{ConnectionStateListener}} (note see [[Utilities|utilities.html]] for details on properly decorating listeners) is called when there are connection
+disruptions. Clients can monitor these changes and take
appropriate action. These are the possible state changes:
|CONNECTED|Sent for the first successful connection to the server. NOTE: You will only get one of these messages for any CuratorFramework instance.|
* getSortedChildren: Return the children of the given path sorted by sequence number
* makePath: Given a parent path and a child node, create a combined full path
+h2. Circuit Breaking ConnectionStateListener
+
+During network outages ZooKeeper can become very noisy sending connection/disconnection events in rapid succession. Curator recipes respond to these
+messages by resetting state, etc. E.g. LeaderLatch must delete its lock node and try to recreate it in order to try to re\-obtain leadership, etc.
+
+This noisy herding can be avoided by using the circuit breaking listener decorator. When it receives ConnectionState.SUSPENDED, the circuit becomes "open"
+(based on the provided RetryPolicy) and will ignore future connection state changes until RetryPolicy timeout has elapsed. Note: however, if the connection
+goes from ConnectionState.SUSPENDED to ConnectionState.LOST the first LOST state is sent.
+
+When the circuit decorator is closed, all connection state changes are forwarded to the managed listener. When the first disconnected state is received, the
+circuit becomes open. The state change that caused the circuit to open is sent to the managed listener and the RetryPolicy will be used to get a delay amount.
+While the delay is active, the decorator will store state changes but will not forward them to the managed listener (except, however, the first time the state
+changes from SUSPENDED to LOST). When the delay elapses, if the connection has been restored, the circuit closes and forwards the new state to the managed listener.
+If the connection has not been restored, the RetryPolicy is checked again. If the RetryPolicy indicates another retry is allowed the process repeats. If, however,
+the RetryPolicy indicates that retries are exhausted then the circuit closes \- if the current state is different than the state that caused the circuit to open it is
+forwarded to the managed listener.
+
+You can enable the Circuit Breaking ConnectionStateListener during creation of your CuratorFramework instance. All Curator recipes will decorate
+their ConnectionStateListeners using the configured decorator. E.g.
+
+{code}
+ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(...);
+CuratorFramework client = CuratorFrameworkFactory.builder()
+ ...
+ .connectionStateListenerDecorator(decorator)
+ ...
+ .build();
+{code}
+
+If you are setting a ConnectionStateListener you should always "decorate" it by calling {{decorateConnectionStateListener()}}.
+
+{code}
+CuratorFramework client ...
+ConnectionStateListener listener = ...
+ConnectionStateListener decoratedListener = client.decorateConnectionStateListener(listener);
+
+...
+
+client.getConnectionStateListenable().addListener(decoratedListener);
+
+// later, to remove...
+client.getConnectionStateListenable().removeListener(decoratedListener);
+{code}
+
h2. Locker
Curator's Locker uses Java 7's try\-with\-resources feature to making using Curator locks safer: