CURATOR-505 - decoration of ConnectionStateListeners is now automatic (a backdoor...
authorrandgalt <randgalt@apache.org>
Thu, 7 Feb 2019 17:58:41 +0000 (12:58 -0500)
committerrandgalt <randgalt@apache.org>
Thu, 7 Feb 2019 17:58:41 +0000 (12:58 -0500)
17 files changed:
curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerContainer.java [new file with mode: 0644]
curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListener.java
curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
src/site/confluence/errors.confluence
src/site/confluence/utilities.confluence

index 2657781..3737faa 100644 (file)
@@ -30,7 +30,6 @@ import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.schema.SchemaSet;
 import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
 import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.framework.state.ConnectionStateListenerDecorator;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
@@ -358,23 +357,4 @@ public interface CuratorFramework extends Closeable
      * @since 4.1.0
      */
     CompletableFuture<Void> runSafe(Runnable runnable);
-
-    /**
-     * Uses the configured {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator}
-     * to decorate the given listener. You should always decorate connection state listeners via
-     * this method. See the Curator recipes for examples.
-     *
-     * @param actual listener to decorate
-     * @return decorated listener
-     */
-    ConnectionStateListener decorateConnectionStateListener(ConnectionStateListener actual);
-
-    /**
-     * Returns a facade of the current instance that uses the given connection state listener
-     * decorator instead of the configured one
-     *
-     * @param newDecorator decorator to use
-     * @return facade
-     */
-    CuratorFramework usingConnectionStateListenerDecorator(ConnectionStateListenerDecorator newDecorator);
 }
index f210021..d9c3424 100644 (file)
@@ -41,7 +41,6 @@ import org.apache.curator.framework.schema.SchemaSet;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
 import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.framework.state.ConnectionStateListenerDecorator;
 import org.apache.curator.framework.state.ConnectionStateManager;
 import org.apache.curator.utils.DebugUtils;
 import org.apache.curator.utils.EnsurePath;
@@ -91,7 +90,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final SchemaSet schemaSet;
     private final boolean zk34CompatibilityMode;
     private final Executor runSafeService;
-    private final ConnectionStateListenerDecorator connectionStateListenerDecorator;
 
     private volatile ExecutorService executorService;
     private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -141,7 +139,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         namespace = new NamespaceImpl(this, builder.getNamespace());
         threadFactory = getThreadFactory(builder);
         maxCloseWaitMs = builder.getMaxCloseWaitMs();
-        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent());
+        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerDecorator());
         compressionProvider = builder.getCompressionProvider();
         aclProvider = builder.getAclProvider();
         state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
@@ -149,7 +147,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
         connectionStateErrorPolicy = Preconditions.checkNotNull(builder.getConnectionStateErrorPolicy(), "errorPolicy cannot be null");
         schemaSet = Preconditions.checkNotNull(builder.getSchemaSet(), "schemaSet cannot be null");
         zk34CompatibilityMode = builder.isZk34CompatibilityMode();
-        connectionStateListenerDecorator = builder.getConnectionStateListenerDecorator();
 
         byte[] builderDefaultData = builder.getDefaultData();
         defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
@@ -236,11 +233,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
     protected CuratorFrameworkImpl(CuratorFrameworkImpl parent)
     {
-        this(parent, parent.connectionStateListenerDecorator);
-    }
-
-    private CuratorFrameworkImpl(CuratorFrameworkImpl parent, ConnectionStateListenerDecorator connectionStateListenerDecorator)
-    {
         client = parent.client;
         listeners = parent.listeners;
         unhandledErrorListeners = parent.unhandledErrorListeners;
@@ -265,7 +257,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
         zk34CompatibilityMode = parent.zk34CompatibilityMode;
         ensembleTracker = null;
         runSafeService = parent.runSafeService;
-        this.connectionStateListenerDecorator = connectionStateListenerDecorator;
     }
 
     @Override
@@ -334,6 +325,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
                         logAsErrorConnectionErrors.set(true);
                     }
                 }
+
+                @Override
+                public boolean doNotDecorate()
+                {
+                    return true;
+                }
             };
 
             this.getConnectionStateListenable().addListener(listener);
@@ -598,18 +595,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
         return schemaSet;
     }
 
-    @Override
-    public ConnectionStateListener decorateConnectionStateListener(ConnectionStateListener actual)
-    {
-        return connectionStateListenerDecorator.decorateListener(this, actual);
-    }
-
-    @Override
-    public CuratorFramework usingConnectionStateListenerDecorator(ConnectionStateListenerDecorator newDecorator)
-    {
-        return new CuratorFrameworkImpl(this, newDecorator);
-    }
-
     ACLProvider getAclProvider()
     {
         return aclProvider;
index 7d8fe19..8ca63f6 100644 (file)
@@ -72,6 +72,12 @@ public class EnsembleTracker implements Closeable, CuratorWatcher
                 }
             }
         }
+
+        @Override
+        public boolean doNotDecorate()
+        {
+            return true;
+        }
     };
 
     private enum State
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerContainer.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerContainer.java
new file mode 100644 (file)
index 0000000..3a37ecb
--- /dev/null
@@ -0,0 +1,112 @@
+package org.apache.curator.framework.listen;
+
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.curator.utils.ThreadUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Upgraded version of {@link org.apache.curator.framework.listen.ListenerContainer} that
+ * doesn't leak Guava's internals and also supports mapping/wrapping of listeners
+ */
+public class MappingListenerContainer<K, V> implements Listenable<K>
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final Map<K, ListenerEntry<V>> listeners = Maps.newConcurrentMap();
+    private final Function<K, V> mapper;
+
+    /**
+     * Returns a new standard version that does no mapping
+     *
+     * @return new container
+     */
+    public static <T> MappingListenerContainer<T, T> nonMapping()
+    {
+        return new MappingListenerContainer<>(Function.identity());
+    }
+
+    /**
+     * Returns a new container that wraps listeners using the given mapper
+     *
+     * @param mapper listener mapper/wrapper
+     * @return new container
+     */
+    public static <K, V> MappingListenerContainer<K, V> mapping(Function<K, V> mapper)
+    {
+        return new MappingListenerContainer<>(mapper);
+    }
+
+    @Override
+    public void addListener(K listener)
+    {
+        addListener(listener, MoreExecutors.directExecutor());
+    }
+
+    @Override
+    public void addListener(K listener, Executor executor)
+    {
+        V mapped = mapper.apply(listener);
+        listeners.put(listener, new ListenerEntry<V>(mapped, executor));
+    }
+
+    @Override
+    public void removeListener(K listener)
+    {
+        if ( listener != null )
+        {
+            listeners.remove(listener);
+        }
+    }
+
+    /**
+     * Remove all listeners
+     */
+    public void clear()
+    {
+        listeners.clear();
+    }
+
+    /**
+     * Return the number of listeners
+     *
+     * @return number
+     */
+    public int size()
+    {
+        return listeners.size();
+    }
+
+    /**
+     * Utility - apply the given function to each listener. The function receives
+     * the listener as an argument.
+     *
+     * @param function function to call for each listener
+     */
+    public void forEach(Consumer<V> function)
+    {
+        for ( ListenerEntry<V> entry : listeners.values() )
+        {
+            entry.executor.execute(() -> {
+                try
+                {
+                    function.accept(entry.listener);
+                }
+                catch ( Throwable e )
+                {
+                    ThreadUtils.checkInterrupted(e);
+                    log.error(String.format("Listener (%s) threw an exception", entry.listener), e);
+                }
+            });
+        }
+    }
+
+    private MappingListenerContainer(Function<K, V> mapper)
+    {
+        this.mapper = mapper;
+    }
+}
index 075e6ec..71635d0 100644 (file)
@@ -28,5 +28,18 @@ public interface ConnectionStateListener
      * @param client the client
      * @param newState the new state
      */
-    public void stateChanged(CuratorFramework client, ConnectionState newState);
+    void stateChanged(CuratorFramework client, ConnectionState newState);
+
+    /**
+     * Normally, ConnectionStateListeners are decorated via the configured
+     * {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator}. For certain
+     * critical cases, however, this is not desired. If your listener returns <code>true</code>
+     * for doNotDecorate(), it will not be passed through the decorator.
+     *
+     * @return true/false
+     */
+    default boolean doNotDecorate()
+    {
+        return false;
+    }
 }
index 5e28b3d..3654f61 100644 (file)
 
 package org.apache.curator.framework.state;
 
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.MappingListenerContainer;
 import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ThreadUtils;
 import org.slf4j.Logger;
@@ -68,10 +68,10 @@ public class ConnectionStateManager implements Closeable
     private final CuratorFramework client;
     private final int sessionTimeoutMs;
     private final int sessionExpirationPercent;
-    private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
     private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false);
     private final ExecutorService service;
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+    private final MappingListenerContainer<ConnectionStateListener, ConnectionStateListener> listeners;
 
     // guarded by sync
     private ConnectionState currentConnectionState;
@@ -93,6 +93,18 @@ public class ConnectionStateManager implements Closeable
      */
     public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent)
     {
+        this(client, threadFactory, sessionTimeoutMs, sessionExpirationPercent, ConnectionStateListenerDecorator.standard);
+    }
+
+    /**
+     * @param client        the client
+     * @param threadFactory thread factory to use or null for a default
+     * @param sessionTimeoutMs the ZK session timeout in milliseconds
+     * @param sessionExpirationPercent percentage of negotiated session timeout to use when simulating a session timeout. 0 means don't simulate at all
+     * @param connectionStateListenerDecorator the decorator to use
+     */
+    public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent, ConnectionStateListenerDecorator connectionStateListenerDecorator)
+    {
         this.client = client;
         this.sessionTimeoutMs = sessionTimeoutMs;
         this.sessionExpirationPercent = sessionExpirationPercent;
@@ -101,6 +113,7 @@ public class ConnectionStateManager implements Closeable
             threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
         }
         service = Executors.newSingleThreadExecutor(threadFactory);
+        listeners = MappingListenerContainer.mapping(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener));
     }
 
     /**
@@ -138,8 +151,9 @@ public class ConnectionStateManager implements Closeable
      * Return the listenable
      *
      * @return listenable
+     * @since 4.2.0 return type has changed from ListenerContainer to Listenable
      */
-    public ListenerContainer<ConnectionStateListener> getListenable()
+    public Listenable<ConnectionStateListener> getListenable()
     {
         return listeners;
     }
@@ -263,18 +277,7 @@ public class ConnectionStateManager implements Closeable
                         log.warn("There are no ConnectionStateListeners registered.");
                     }
 
-                    listeners.forEach
-                        (
-                            new Function<ConnectionStateListener, Void>()
-                            {
-                                @Override
-                                public Void apply(ConnectionStateListener listener)
-                                {
-                                    listener.stateChanged(client, newState);
-                                    return null;
-                                }
-                            }
-                        );
+                    listeners.forEach(listener -> listener.stateChanged(client, newState));
                 }
                 else if ( sessionExpirationPercent > 0 )
                 {
index 1ba88c3..9687e1b 100644 (file)
@@ -64,7 +64,32 @@ public class NodeCache implements Closeable
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
     private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
     private final AtomicBoolean isConnected = new AtomicBoolean(true);
-    private volatile ConnectionStateListener connectionStateListener;
+    private ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
+            {
+                if ( isConnected.compareAndSet(false, true) )
+                {
+                    try
+                    {
+                        reset();
+                    }
+                    catch ( Exception e )
+                    {
+                        ThreadUtils.checkInterrupted(e);
+                        log.error("Trying to reset after reconnection", e);
+                    }
+                }
+            }
+            else
+            {
+                isConnected.set(false);
+            }
+        }
+    };
 
     private Watcher watcher = new Watcher()
     {
@@ -118,8 +143,6 @@ public class NodeCache implements Closeable
         this.client = client.newWatcherRemoveCuratorFramework();
         this.path = PathUtils.validatePath(path);
         this.dataIsCompressed = dataIsCompressed;
-
-        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     public CuratorFramework getClient()
@@ -173,7 +196,7 @@ public class NodeCache implements Closeable
             // has something to do with Guava's cache and circular references
             connectionStateListener = null;
             watcher = null;
-        }
+        }        
     }
 
     /**
@@ -325,7 +348,7 @@ public class NodeCache implements Closeable
             }
         }
     }
-
+    
     /**
      * Default behavior is just to log the exception
      *
@@ -335,27 +358,4 @@ public class NodeCache implements Closeable
     {
         log.error("", e);
     }
-
-    private void handleStateChange(ConnectionState newState)
-    {
-        if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
-        {
-            if ( isConnected.compareAndSet(false, true) )
-            {
-                try
-                {
-                    reset();
-                }
-                catch ( Exception e )
-                {
-                    ThreadUtils.checkInterrupted(e);
-                    log.error("Trying to reset after reconnection", e);
-                }
-            }
-        }
-        else
-        {
-            isConnected.set(false);
-        }
-    }
 }
index 14608ba..bdc73cc 100644 (file)
@@ -128,7 +128,14 @@ public class PathChildrenCache implements Closeable
     @VisibleForTesting
     volatile Exchanger<Object> rebuildTestExchanger;
 
-    private volatile ConnectionStateListener connectionStateListener;
+    private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            handleStateChange(newState);
+        }
+    };
     public static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache");
 
     /**
@@ -218,7 +225,6 @@ public class PathChildrenCache implements Closeable
         this.dataIsCompressed = dataIsCompressed;
         this.executorService = executorService;
         ensureContainers = new EnsureContainers(client, path);
-        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     /**
index a3b1d23..f42c1d5 100644 (file)
@@ -534,7 +534,15 @@ public class TreeCache implements Closeable
     private final ListenerContainer<TreeCacheListener> listeners = new ListenerContainer<TreeCacheListener>();
     private final ListenerContainer<UnhandledErrorListener> errorListeners = new ListenerContainer<UnhandledErrorListener>();
     private final AtomicReference<TreeState> treeState = new AtomicReference<TreeState>(TreeState.LATENT);
-    private final ConnectionStateListener connectionStateListener;
+
+    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            handleStateChange(newState);
+        }
+    };
 
     static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("TreeCache");
 
@@ -578,7 +586,6 @@ public class TreeCache implements Closeable
         this.maxDepth = maxDepth;
         this.disableZkWatches = disableZkWatches;
         this.executorService = Preconditions.checkNotNull(executorService, "executorService cannot be null");
-        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     /**
index 22cf3af..bb8aa73 100644 (file)
@@ -74,7 +74,15 @@ public class LeaderLatch implements Closeable
     private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
     private final CloseMode closeMode;
     private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>();
-    private final ConnectionStateListener listener;
+
+    private final ConnectionStateListener listener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            handleStateChange(newState);
+        }
+    };
 
     private static final String LOCK_NAME = "latch-";
 
@@ -141,7 +149,6 @@ public class LeaderLatch implements Closeable
         this.latchPath = PathUtils.validatePath(latchPath);
         this.id = Preconditions.checkNotNull(id, "id cannot be null");
         this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
-        listener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     /**
index 108d66e..0bb448a 100644 (file)
@@ -26,7 +26,6 @@ import com.google.common.collect.Lists;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.zookeeper.KeeperException;
@@ -69,7 +68,6 @@ public class LeaderSelector implements Closeable
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final CuratorFramework client;
     private final LeaderSelectorListener listener;
-    private final ConnectionStateListener connectionStateListener;
     private final CloseableExecutorService executorService;
     private final InterProcessMutex mutex;
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
@@ -147,8 +145,6 @@ public class LeaderSelector implements Closeable
         this.listener = new WrappedListener(this, listener);
         hasLeadership = false;
 
-        connectionStateListener = client.decorateConnectionStateListener(listener);
-
         this.executorService = executorService;
         mutex = new InterProcessMutex(client, leaderPath)
         {
@@ -219,7 +215,7 @@ public class LeaderSelector implements Closeable
         Preconditions.checkState(!executorService.isShutdown(), "Already started");
         Preconditions.checkState(!hasLeadership, "Already has leadership");
 
-        client.getConnectionStateListenable().addListener(connectionStateListener);
+        client.getConnectionStateListenable().addListener(listener);
         requeue();
     }
 
@@ -275,7 +271,7 @@ public class LeaderSelector implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
 
-        client.getConnectionStateListenable().removeListener(connectionStateListener);
+        client.getConnectionStateListenable().removeListener(listener);
         executorService.close();
         ourTask.set(null);
     }
index 293f46e..81e8dd9 100644 (file)
@@ -145,7 +145,17 @@ public class PersistentNode implements Closeable
             }
         }
     };
-    private final ConnectionStateListener connectionStateListener;
+    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework dummy, ConnectionState newState)
+        {
+            if ( (newState == ConnectionState.RECONNECTED) && isActive() )
+            {
+                createNode();
+            }
+        }
+    };
 
     @VisibleForTesting
     volatile CountDownLatch debugCreateNodeLatch = null;
@@ -203,7 +213,6 @@ public class PersistentNode implements Closeable
         };
 
         this.data.set(Arrays.copyOf(data, data.length));
-        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     private void processBackgroundCallbackClosedState(CuratorEvent event)
@@ -545,12 +554,4 @@ public class PersistentNode implements Closeable
     {
         return authFailure.get();
     }
-
-    private void handleStateChange(ConnectionState newState)
-    {
-        if ( (newState == ConnectionState.RECONNECTED) && isActive() )
-        {
-            createNode();
-        }
-    }
 }
index 5f3e183..5d7abce 100644 (file)
@@ -73,7 +73,26 @@ public class SharedValue implements Closeable, SharedValueReader
         }
     };
 
-    private final ConnectionStateListener connectionStateListener;
+    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            notifyListenerOfStateChanged(newState);
+            if ( newState.isConnected() )
+            {
+                try
+                {
+                    readValueAndNotifyListenersInBackground();
+                }
+                catch ( Exception e )
+                {
+                    ThreadUtils.checkInterrupted(e);
+                    log.error("Could not read value after reconnect", e);
+                }
+            }
+        }
+    };
 
     private enum State
     {
@@ -94,7 +113,6 @@ public class SharedValue implements Closeable, SharedValueReader
         this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
         this.watcher = new SharedValueCuratorWatcher();
         currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
-        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     @VisibleForTesting
@@ -106,7 +124,6 @@ public class SharedValue implements Closeable, SharedValueReader
         // inject watcher for testing
         this.watcher = watcher;
         currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
-        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     @Override
@@ -317,21 +334,4 @@ public class SharedValue implements Closeable, SharedValueReader
                 }
             );
     }
-
-    private void handleStateChange(ConnectionState newState)
-    {
-        notifyListenerOfStateChanged(newState);
-        if ( newState.isConnected() )
-        {
-            try
-            {
-                readValueAndNotifyListenersInBackground();
-            }
-            catch ( Exception e )
-            {
-                ThreadUtils.checkInterrupted(e);
-                log.error("Could not read value after reconnect", e);
-            }
-        }
-    }
 }
index 4270116..d1a31ad 100644 (file)
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.curator.x.discovery.details;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -24,15 +23,14 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.curator.utils.CloseableExecutorService;
+import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.utils.CloseableExecutorService;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceInstance;
@@ -47,16 +45,17 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheListener
 {
-    private final ListenerContainer<ServiceCacheListener> listenerContainer = new ListenerContainer<ServiceCacheListener>();
-    private final ServiceDiscoveryImpl<T> discovery;
-    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
-    private final PathChildrenCache cache;
-    private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap();
-    private final ConcurrentMap<ServiceCacheListener, ConnectionStateListener> connectionStateListeners = Maps.newConcurrentMap();
+    private final ListenerContainer<ServiceCacheListener>           listenerContainer = new ListenerContainer<ServiceCacheListener>();
+    private final ServiceDiscoveryImpl<T>                           discovery;
+    private final AtomicReference<State>                            state = new AtomicReference<State>(State.LATENT);
+    private final PathChildrenCache                                 cache;
+    private final ConcurrentMap<String, ServiceInstance<T>>         instances = Maps.newConcurrentMap();
 
     private enum State
     {
-        LATENT, STARTED, STOPPED
+        LATENT,
+        STARTED,
+        STOPPED
     }
 
     private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory)
@@ -124,15 +123,18 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     {
         Preconditions.checkState(state.compareAndSet(State.STARTED, State.STOPPED), "Already closed or has not been started");
 
-        listenerContainer.forEach(new Function<ServiceCacheListener, Void>()
-        {
-            @Override
-            public Void apply(ServiceCacheListener listener)
-            {
-                discovery.getClient().getConnectionStateListenable().removeListener(unwrap(listener));
-                return null;
-            }
-        });
+        listenerContainer.forEach
+            (
+                new Function<ServiceCacheListener, Void>()
+                {
+                    @Override
+                    public Void apply(ServiceCacheListener listener)
+                    {
+                        discovery.getClient().getConnectionStateListenable().removeListener(listener);
+                        return null;
+                    }
+                }
+            );
         listenerContainer.clear();
 
         CloseableUtils.closeQuietly(cache);
@@ -144,56 +146,59 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
     public void addListener(ServiceCacheListener listener)
     {
         listenerContainer.addListener(listener);
-        discovery.getClient().getConnectionStateListenable().addListener(wrap(listener));
+        discovery.getClient().getConnectionStateListenable().addListener(listener);
     }
 
     @Override
     public void addListener(ServiceCacheListener listener, Executor executor)
     {
         listenerContainer.addListener(listener, executor);
-        discovery.getClient().getConnectionStateListenable().addListener(wrap(listener), executor);
+        discovery.getClient().getConnectionStateListenable().addListener(listener, executor);
     }
 
     @Override
     public void removeListener(ServiceCacheListener listener)
     {
         listenerContainer.removeListener(listener);
-        discovery.getClient().getConnectionStateListenable().removeListener(unwrap(listener));
+        discovery.getClient().getConnectionStateListenable().removeListener(listener);
     }
 
     @Override
     public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
     {
-        boolean notifyListeners = false;
+        boolean         notifyListeners = false;
         switch ( event.getType() )
         {
-        case CHILD_ADDED:
-        case CHILD_UPDATED:
-        {
-            addInstance(event.getData(), false);
-            notifyListeners = true;
-            break;
-        }
+            case CHILD_ADDED:
+            case CHILD_UPDATED:
+            {
+                addInstance(event.getData(), false);
+                notifyListeners = true;
+                break;
+            }
 
-        case CHILD_REMOVED:
-        {
-            instances.remove(instanceIdFromData(event.getData()));
-            notifyListeners = true;
-            break;
-        }
+            case CHILD_REMOVED:
+            {
+                instances.remove(instanceIdFromData(event.getData()));
+                notifyListeners = true;
+                break;
+            }
         }
 
         if ( notifyListeners )
         {
-            listenerContainer.forEach(new Function<ServiceCacheListener, Void>()
-            {
-                @Override
-                public Void apply(ServiceCacheListener listener)
+            listenerContainer.forEach
+            (
+                new Function<ServiceCacheListener, Void>()
                 {
-                    listener.cacheChanged();
-                    return null;
+                    @Override
+                    public Void apply(ServiceCacheListener listener)
+                    {
+                        listener.cacheChanged();
+                        return null;
+                    }
                 }
-            });
+            );
         }
     }
 
@@ -204,8 +209,8 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
 
     private void addInstance(ChildData childData, boolean onlyIfAbsent) throws Exception
     {
-        String instanceId = instanceIdFromData(childData);
-        ServiceInstance<T> serviceInstance = discovery.getSerializer().deserialize(childData.getData());
+        String                  instanceId = instanceIdFromData(childData);
+        ServiceInstance<T>      serviceInstance = discovery.getSerializer().deserialize(childData.getData());
         if ( onlyIfAbsent )
         {
             instances.putIfAbsent(instanceId, serviceInstance);
@@ -216,16 +221,4 @@ public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheLi
         }
         cache.clearDataBytes(childData.getPath(), childData.getStat().getVersion());
     }
-
-    private ConnectionStateListener wrap(ServiceCacheListener listener)
-    {
-        ConnectionStateListener wrapped = discovery.getClient().decorateConnectionStateListener(listener);
-        connectionStateListeners.put(listener, wrapped);
-        return wrapped;
-    }
-
-    private ConnectionStateListener unwrap(ServiceCacheListener listener)
-    {
-        return connectionStateListeners.remove(listener);
-    }
 }
index 2e10095..476705c 100644 (file)
@@ -65,7 +65,29 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
     private final Collection<ServiceCache<T>> caches = Sets.newSetFromMap(Maps.<ServiceCache<T>, Boolean>newConcurrentMap());
     private final Collection<ServiceProvider<T>> providers = Sets.newSetFromMap(Maps.<ServiceProvider<T>, Boolean>newConcurrentMap());
     private final boolean watchInstances;
-    private final ConnectionStateListener connectionStateListener;
+    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            if ( (newState == ConnectionState.RECONNECTED) || (newState == ConnectionState.CONNECTED) )
+            {
+                try
+                {
+                    log.debug("Re-registering due to reconnection");
+                    reRegisterServices();
+                }
+                catch (InterruptedException ex)
+                {
+                    Thread.currentThread().interrupt();
+                }
+                catch ( Exception e )
+                {
+                    log.error("Could not re-register instances after reconnection", e);
+                }
+            }
+        }
+    };
 
     private static class Entry<T>
     {
@@ -97,7 +119,6 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
             entry.cache = makeNodeCache(thisInstance);
             services.put(thisInstance.getId(), entry);
         }
-        connectionStateListener = client.decorateConnectionStateListener((__, newState) -> handleStateChange(newState));
     }
 
     /**
@@ -509,24 +530,4 @@ public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
             }
         }
     }
-
-    private void handleStateChange(ConnectionState newState)
-    {
-        if ( (newState == ConnectionState.RECONNECTED) || (newState == ConnectionState.CONNECTED) )
-        {
-            try
-            {
-                log.debug("Re-registering due to reconnection");
-                reRegisterServices();
-            }
-            catch (InterruptedException ex)
-            {
-                Thread.currentThread().interrupt();
-            }
-            catch ( Exception e )
-            {
-                log.error("Could not re-register instances after reconnection", e);
-            }
-        }
-    }
 }
index 97f23fd..b4f6643 100644 (file)
@@ -19,8 +19,7 @@ in a retry mechanism. Thus, the following guarantees can be made:
 h2. Notifications
 Curator exposes several listenable interfaces for clients to monitor the state of the ZooKeeper connection.
 
-{{ConnectionStateListener}} (note see [[Utilities|utilities.html]] for details on properly decorating listeners) is called when there are connection
-disruptions. Clients can monitor these changes and take
+{{ConnectionStateListener}} is called when there are connection disruptions. Clients can monitor these changes and take
 appropriate action. These are the possible state changes:
 
 |CONNECTED|Sent for the first successful connection to the server. NOTE: You will only get one of these messages for any CuratorFramework instance.|
index 1971c3c..720d8d9 100644 (file)
@@ -31,8 +31,7 @@ If the connection has not been restored, the RetryPolicy is checked again. If th
 the RetryPolicy indicates that retries are exhausted then the circuit closes \- if the current state is different than the state that caused the circuit to open it is
 forwarded to the managed listener.
 
-You can enable the Circuit Breaking ConnectionStateListener during creation of your CuratorFramework instance. All Curator recipes will decorate
-their ConnectionStateListeners using the configured decorator. E.g.
+You can enable the Circuit Breaking ConnectionStateListener during creation of your CuratorFramework instance. E.g.
 
 {code}
 ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(...);
@@ -43,21 +42,6 @@ CuratorFramework client = CuratorFrameworkFactory.builder()
     .build();
 {code}
 
-If you are setting a ConnectionStateListener you should always "decorate" it by calling {{decorateConnectionStateListener()}}.
-
-{code}
-CuratorFramework client ...
-ConnectionStateListener listener = ...
-ConnectionStateListener decoratedListener = client.decorateConnectionStateListener(listener);
-
-...
-
-client.getConnectionStateListenable().addListener(decoratedListener);
-
-// later, to remove...
-client.getConnectionStateListenable().removeListener(decoratedListener);
-{code}
-
 h2. Locker
 
 Curator's Locker uses Java 7's try\-with\-resources feature to making using Curator locks safer: