package org.apache.curator;
import com.google.common.base.Preconditions;
-import org.apache.curator.connection.ClassicConnectionHandlingPolicy;
import org.apache.curator.connection.ConnectionHandlingPolicy;
+import org.apache.curator.connection.StandardConnectionHandlingPolicy;
import org.apache.curator.drivers.OperationTrace;
import org.apache.curator.drivers.TracerDriver;
import org.apache.curator.ensemble.EnsembleProvider;
*/
public CuratorZookeeperClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
{
- this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, new ClassicConnectionHandlingPolicy());
+ this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, new StandardConnectionHandlingPolicy());
}
/**
*/
public CuratorZookeeperClient(EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
{
- this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, new ClassicConnectionHandlingPolicy());
+ this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, new StandardConnectionHandlingPolicy());
}
/**
*/
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly)
{
- this(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, canBeReadOnly, new ClassicConnectionHandlingPolicy());
+ this(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, canBeReadOnly, new StandardConnectionHandlingPolicy());
}
/**
+++ /dev/null
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.connection;
-
-import org.apache.curator.CuratorZookeeperClient;
-import org.apache.curator.RetryLoop;
-import org.apache.curator.utils.ThreadUtils;
-import java.util.concurrent.Callable;
-
-/**
- * Emulates the pre 3.0.0 Curator connection handling
- */
-public class ClassicConnectionHandlingPolicy implements ConnectionHandlingPolicy
-{
- @Override
- public int getSimulatedSessionExpirationPercent()
- {
- return 0;
- }
-
- @Override
- public <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception
- {
- T result = null;
- RetryLoop retryLoop = client.newRetryLoop();
- while ( retryLoop.shouldContinue() )
- {
- try
- {
- client.internalBlockUntilConnectedOrTimedOut();
- result = proc.call();
- retryLoop.markComplete();
- }
- catch ( Exception e )
- {
- ThreadUtils.checkInterrupted(e);
- retryLoop.takeException(e);
- }
- }
-
- return result;
- }
-
- @Override
- public CheckTimeoutsResult checkTimeouts(Callable<String> hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception
- {
- CheckTimeoutsResult result = CheckTimeoutsResult.NOP;
- int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs);
- long elapsed = System.currentTimeMillis() - connectionStartMs;
- if ( elapsed >= minTimeout )
- {
- if ( hasNewConnectionString.call() != null )
- {
- result = CheckTimeoutsResult.NEW_CONNECTION_STRING;
- }
- else
- {
- int maxTimeout = Math.max(sessionTimeoutMs, connectionTimeoutMs);
- if ( elapsed > maxTimeout )
- {
- result = CheckTimeoutsResult.RESET_CONNECTION;
- }
- else
- {
- result = CheckTimeoutsResult.CONNECTION_TIMEOUT;
- }
- }
- }
-
- return result;
- }
-}
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.RetryLoop;
import org.apache.curator.utils.ThreadUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
/**
*/
public class StandardConnectionHandlingPolicy implements ConnectionHandlingPolicy
{
- private final Logger log = LoggerFactory.getLogger(getClass());
private final int expirationPercent;
public StandardConnectionHandlingPolicy()
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import org.apache.curator.connection.ClassicConnectionHandlingPolicy;
+import org.apache.curator.connection.StandardConnectionHandlingPolicy;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.ZooKeeper;
CuratorZookeeperClient curator = mock(CuratorZookeeperClient.class);
RetryPolicy retryPolicy = new RetryOneTime(1);
RetryLoop retryLoop = new RetryLoop(retryPolicy, null);
- when(curator.getConnectionHandlingPolicy()).thenReturn(new ClassicConnectionHandlingPolicy());
+ when(curator.getConnectionHandlingPolicy()).thenReturn(new StandardConnectionHandlingPolicy());
when(curator.getZooKeeper()).thenReturn(client);
when(curator.getRetryPolicy()).thenReturn(retryPolicy);
when(curator.newRetryLoop()).thenReturn(retryLoop);
RetryPolicy retryPolicy = new RetryOneTime(1);
RetryLoop retryLoop = new RetryLoop(retryPolicy, null);
final CuratorZookeeperClient curator = mock(CuratorZookeeperClient.class);
- when(curator.getConnectionHandlingPolicy()).thenReturn(new ClassicConnectionHandlingPolicy());
+ when(curator.getConnectionHandlingPolicy()).thenReturn(new StandardConnectionHandlingPolicy());
when(curator.getZooKeeper()).thenReturn(client);
when(curator.getRetryPolicy()).thenReturn(retryPolicy);
when(curator.newRetryLoop()).thenReturn(retryLoop);
import com.google.common.collect.ImmutableList;
import org.apache.curator.RetryPolicy;
-import org.apache.curator.connection.ClassicConnectionHandlingPolicy;
import org.apache.curator.connection.ConnectionHandlingPolicy;
import org.apache.curator.connection.StandardConnectionHandlingPolicy;
import org.apache.curator.ensemble.EnsembleProvider;
private boolean canBeReadOnly = false;
private boolean useContainerParentsIfAvailable = true;
private ConnectionStateErrorPolicy connectionStateErrorPolicy = new StandardConnectionStateErrorPolicy();
- private ConnectionHandlingPolicy connectionHandlingPolicy = Boolean.getBoolean("curator-use-classic-connection-handling") ? new ClassicConnectionHandlingPolicy() : new StandardConnectionHandlingPolicy();
+ private ConnectionHandlingPolicy connectionHandlingPolicy = new StandardConnectionHandlingPolicy();
private SchemaSet schemaSet = SchemaSet.getDefaultSchemaSet();
private boolean zk34CompatibilityMode = isZK34();
* </p>
* <p>
* <strong>IMPORTANT: </strong> StandardConnectionHandlingPolicy has different behavior than the connection
- * policy handling prior to version 3.0.0. You can specify that the connection handling be the method
- * prior to 3.0.0 by passing in an instance of {@link ClassicConnectionHandlingPolicy} here or by
- * setting the command line value "curator-use-classic-connection-handling" to true (e.g. <tt>-Dcurator-use-classic-connection-handling=true</tt>).
+ * policy handling prior to version 3.0.0.
* </p>
* <p>
* Major differences from the older behavior are:
+++ /dev/null
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.imps;
-
-import org.apache.curator.framework.state.ConnectionState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class ClassicInternalConnectionHandler implements InternalConnectionHandler
-{
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @Override
- public void checkNewConnection(CuratorFrameworkImpl client)
- {
- // NOP
- }
-
- @Override
- public void suspendConnection(CuratorFrameworkImpl client)
- {
- if ( client.setToSuspended() )
- {
- doSyncForSuspendedConnection(client, client.getZookeeperClient().getInstanceIndex());
- }
- }
-
- private void doSyncForSuspendedConnection(final CuratorFrameworkImpl client, final long instanceIndex)
- {
- // we appear to have disconnected, force a new ZK event and see if we can connect to another server
- final BackgroundOperation<String> operation = new BackgroundSyncImpl(client, null);
- OperationAndData.ErrorCallback<String> errorCallback = new OperationAndData.ErrorCallback<String>()
- {
- @Override
- public void retriesExhausted(OperationAndData<String> operationAndData)
- {
- // if instanceIndex != newInstanceIndex, the ZooKeeper instance was reset/reallocated
- // so the pending background sync is no longer valid.
- // if instanceIndex is -1, this is the second try to sync - punt and mark the connection lost
- if ( (instanceIndex < 0) || (instanceIndex == client.getZookeeperClient().getInstanceIndex()) )
- {
- client.addStateChange(ConnectionState.LOST);
- }
- else
- {
- log.debug("suspendConnection() failure ignored as the ZooKeeper instance was reset. Retrying.");
- // send -1 to signal that if it happens again, punt and mark the connection lost
- doSyncForSuspendedConnection(client, -1);
- }
- }
- };
- client.performBackgroundOperation(new OperationAndData<String>(operation, "/", null, errorCallback, null, null));
- }
-}
builder.getConnectionHandlingPolicy()
);
- boolean isClassic = (builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent() == 0);
- internalConnectionHandler = isClassic ? new ClassicInternalConnectionHandler() : new StandardInternalConnectionHandler();
+ internalConnectionHandler = new StandardInternalConnectionHandler();
listeners = new ListenerContainer<CuratorListener>();
unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
backgroundOperations = new DelayQueue<OperationAndData<?>>();
Assert.assertNull(states.poll(timing.multiple(.5).milliseconds(), TimeUnit.MILLISECONDS)); // there should be no other events
}
-
- @Override
- protected boolean enabledSessionExpiredStateAware()
- {
- return true;
- }
}
@Test
public void testSessionLossWithLongTimeout() throws Exception
{
-
final Timing timing = new Timing();
- //Change this to TRUE and the test will pass.
- System.setProperty("curator-use-classic-connection-handling", Boolean.FALSE.toString());
-
+
try(final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.forWaiting().milliseconds(),
timing.connection(), new RetryOneTime(1)))
{
-
final CountDownLatch connectedLatch = new CountDownLatch(1);
final CountDownLatch lostLatch = new CountDownLatch(1);
final CountDownLatch restartedLatch = new CountDownLatch(1);
server.restart();
Assert.assertTrue(timing.awaitLatch(restartedLatch));
}
- finally
- {
- System.clearProperty("curator-use-classic-connection-handling");
- }
}
@Test
}
}
- @Override
- protected boolean enabledSessionExpiredStateAware()
- {
- return true;
- }
-
private CuratorFramework newClient(SchemaSet schemaSet)
{
return CuratorFrameworkFactory.builder()
client.getConnectionStateListenable().addListener(listener1);
log.debug("Starting ZK server");
server.restart();
- if ( Boolean.getBoolean("curator-use-classic-connection-handling") )
- {
- Assert.assertEquals(listenerSequence.poll(forWaiting.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
- Assert.assertEquals(listenerSequence.poll(forWaiting.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST);
- }
Assert.assertEquals(listenerSequence.poll(forWaiting.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
log.debug("Stopping ZK server");
{
CuratorFramework client;
TreeCache cache;
- private final AtomicBoolean hadBackgroundException = new AtomicBoolean(false);
+ protected final AtomicBoolean hadBackgroundException = new AtomicBoolean(false);
private final BlockingQueue<TreeCacheEvent> events = new LinkedBlockingQueue<TreeCacheEvent>();
private final Timing timing = new Timing();
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.test.compatibility.KillSession2;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class TestTreeCache extends BaseTestTreeCache
import org.testng.IInvokedMethodListener2;
import org.testng.IRetryAnalyzer;
import org.testng.ITestContext;
-import org.testng.ITestNGMethod;
import org.testng.ITestResult;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@BeforeSuite(alwaysRun = true)
public void beforeSuite(ITestContext context)
{
- context.getSuite().addListener(new MethodListener(log, enabledSessionExpiredStateAware()));
+ context.getSuite().addListener(new MethodListener(log));
}
@BeforeMethod
}
}
- protected boolean enabledSessionExpiredStateAware()
- {
- return false;
- }
-
private static class RetryContext
{
final AtomicBoolean isRetrying = new AtomicBoolean(false);
private static class MethodListener implements IInvokedMethodListener2
{
private final Logger log;
- private final boolean sessionExpiredStateAware;
private static final String ATTRIBUTE_NAME = "__curator";
- MethodListener(Logger log, boolean sessionExpiredStateAware)
+ MethodListener(Logger log)
{
this.log = log;
- this.sessionExpiredStateAware = sessionExpiredStateAware;
}
@Override
retryContext = new RetryContext();
context.setAttribute(ATTRIBUTE_NAME, retryContext);
}
-
- if ( !sessionExpiredStateAware )
- {
- System.setProperty("curator-use-classic-connection-handling", Boolean.toString(retryContext.runVersion.get() > 0));
- log.info("curator-use-classic-connection-handling: " + Boolean.toString(retryContext.runVersion.get() > 0));
- }
}
else if ( method.isTestMethod() )
{
@Override
public void afterInvocation(IInvokedMethod method, ITestResult testResult, ITestContext context)
{
- if ( method.getTestMethod().isBeforeSuiteConfiguration() && !sessionExpiredStateAware )
- {
- for ( ITestNGMethod testMethod : context.getAllTestMethods() )
- {
- testMethod.setInvocationCount(2);
- }
- }
-
if ( method.isTestMethod() )
{
RetryContext retryContext = (RetryContext)context.getAttribute(ATTRIBUTE_NAME);
}
else
{
- System.clearProperty("curator-use-classic-connection-handling");
if ( testResult.isSuccess() || (testResult.getStatus() == ITestResult.FAILURE) )
{
retryContext.isRetrying.set(false);