CURATOR-505 - refactoring/refining a new listener container that doesn't rely on...
authorrandgalt <randgalt@apache.org>
Thu, 7 Feb 2019 20:35:10 +0000 (15:35 -0500)
committerrandgalt <randgalt@apache.org>
Thu, 7 Feb 2019 20:35:10 +0000 (15:35 -0500)
curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerContainer.java
curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java [new file with mode: 0644]
curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java [moved from curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerContainer.java with 52% similarity]
curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java [new file with mode: 0644]
curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreaker.java
curator-framework/src/main/java/org/apache/curator/framework/state/CircuitBreakingConnectionStateListener.java
curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateListenerDecorator.java
curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreaker.java
curator-framework/src/test/java/org/apache/curator/framework/state/TestCircuitBreakingConnectionStateListener.java

index 283a093..a5c08ff 100644 (file)
@@ -498,9 +498,8 @@ public class CuratorFrameworkFactory
         }
 
         /**
-         * Sets the connection state listener decorator. Curator recipes (and proper client code)
-         * will always decorate connection state listeners via this decorator. For example,
-         * you can set use {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}s
+         * Sets the connection state listener decorator. For example,
+         * you can set {@link org.apache.curator.framework.state.CircuitBreakingConnectionStateListener}s
          * via this mechanism by using {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator#circuitBreaking(org.apache.curator.RetryPolicy)}
          * or {@link org.apache.curator.framework.state.ConnectionStateListenerDecorator#circuitBreaking(org.apache.curator.RetryPolicy, java.util.concurrent.ScheduledExecutorService)}
          *
index 60ae501..9139439 100644 (file)
@@ -29,7 +29,11 @@ import java.util.concurrent.Executor;
 
 /**
  * Abstracts an object that has listeners
+ *
+ * @deprecated Prefer {@link MappingListenerManager} and
+ * {@link StandardListenerManager}
  */
+@Deprecated
 public class ListenerContainer<T> implements Listenable<T>
 {
     private final Logger                        log = LoggerFactory.getLogger(getClass());
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/ListenerManager.java
new file mode 100644 (file)
index 0000000..cab0426
--- /dev/null
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.listen;
+
+import java.util.function.Consumer;
+
+public interface ListenerManager<K, V> extends Listenable<K>
+{
+    /**
+     * Remove all listeners
+     */
+    void clear();
+
+    /**
+     * Return the number of listeners
+     *
+     * @return number
+     */
+    int size();
+
+    /**
+     * Utility - apply the given function to each listener. The function receives
+     * the listener as an argument.
+     *
+     * @param function function to call for each listener
+     */
+    void forEach(Consumer<V> function);
+}
@@ -1,33 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.listen;
 
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.curator.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.UnaryOperator;
 
 /**
  * Upgraded version of {@link org.apache.curator.framework.listen.ListenerContainer} that
  * doesn't leak Guava's internals and also supports mapping/wrapping of listeners
  */
-public class MappingListenerContainer<K, V> implements Listenable<K>
+public class MappingListenerManager<K, V> implements ListenerManager<K, V>
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private final Map<K, ListenerEntry<V>> listeners = Maps.newConcurrentMap();
+    private final Map<K, ListenerEntry<V>> listeners = new ConcurrentHashMap<>();
     private final Function<K, V> mapper;
 
     /**
-     * Returns a new standard version that does no mapping
+     * Returns a new mapping container that maps to the same type
      *
+     * @param mapper listener mapper/wrapper
      * @return new container
      */
-    public static <T> MappingListenerContainer<T, T> nonMapping()
+    public static <T> StandardListenerManager<T> mappingStandard(UnaryOperator<T> mapper)
     {
-        return new MappingListenerContainer<>(Function.identity());
+        MappingListenerManager<T, T> container = new MappingListenerManager<>(mapper);
+        return new StandardListenerManager<>(container);
     }
 
     /**
@@ -36,22 +56,22 @@ public class MappingListenerContainer<K, V> implements Listenable<K>
      * @param mapper listener mapper/wrapper
      * @return new container
      */
-    public static <K, V> MappingListenerContainer<K, V> mapping(Function<K, V> mapper)
+    public static <K, V> ListenerManager<K, V> mapping(Function<K, V> mapper)
     {
-        return new MappingListenerContainer<>(mapper);
+        return new MappingListenerManager<>(mapper);
     }
 
     @Override
     public void addListener(K listener)
     {
-        addListener(listener, MoreExecutors.directExecutor());
+        addListener(listener, Runnable::run);
     }
 
     @Override
     public void addListener(K listener, Executor executor)
     {
         V mapped = mapper.apply(listener);
-        listeners.put(listener, new ListenerEntry<V>(mapped, executor));
+        listeners.put(listener, new ListenerEntry<>(mapped, executor));
     }
 
     @Override
@@ -63,30 +83,19 @@ public class MappingListenerContainer<K, V> implements Listenable<K>
         }
     }
 
-    /**
-     * Remove all listeners
-     */
+    @Override
     public void clear()
     {
         listeners.clear();
     }
 
-    /**
-     * Return the number of listeners
-     *
-     * @return number
-     */
+    @Override
     public int size()
     {
         return listeners.size();
     }
 
-    /**
-     * Utility - apply the given function to each listener. The function receives
-     * the listener as an argument.
-     *
-     * @param function function to call for each listener
-     */
+    @Override
     public void forEach(Consumer<V> function)
     {
         for ( ListenerEntry<V> entry : listeners.values() )
@@ -105,7 +114,7 @@ public class MappingListenerContainer<K, V> implements Listenable<K>
         }
     }
 
-    private MappingListenerContainer(Function<K, V> mapper)
+    MappingListenerManager(Function<K, V> mapper)
     {
         this.mapper = mapper;
     }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java b/curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
new file mode 100644 (file)
index 0000000..8b60ac1
--- /dev/null
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.listen;
+
+import java.util.Objects;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Non mapping version of a listener container
+ */
+public class StandardListenerManager<T> implements ListenerManager<T, T>
+{
+    private final ListenerManager<T, T> container;
+
+    /**
+     * Returns a new standard listener container
+     *
+     * @return new container
+     */
+    public static <T> StandardListenerManager<T> standard()
+    {
+        MappingListenerManager<T, T> container = new MappingListenerManager<>(Function.identity());
+        return new StandardListenerManager<>(container);
+    }
+
+    public StandardListenerManager(ListenerManager<T, T> container)
+    {
+        this.container = Objects.requireNonNull(container, "container cannot be null");
+    }
+
+    @Override
+    public void addListener(T listener)
+    {
+        container.addListener(listener);
+    }
+
+    @Override
+    public void addListener(T listener, Executor executor)
+    {
+        container.addListener(listener, executor);
+    }
+
+    @Override
+    public void removeListener(T listener)
+    {
+        container.removeListener(listener);
+    }
+
+    @Override
+    public void clear()
+    {
+        container.clear();
+    }
+
+    @Override
+    public int size()
+    {
+        return container.size();
+    }
+
+    @Override
+    public void forEach(Consumer<T> function)
+    {
+        container.forEach(function);
+    }
+}
index 504edbc..78dc9af 100644 (file)
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.state;
 
 import org.apache.curator.RetryPolicy;
index dba651a..24eba01 100644 (file)
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.state;
 
 import org.apache.curator.RetryPolicy;
index 0ac808b..b95c4b3 100644 (file)
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.state;
 
 import org.apache.curator.RetryPolicy;
index 3654f61..583b9f2 100644 (file)
@@ -22,7 +22,8 @@ package org.apache.curator.framework.state;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.MappingListenerContainer;
+import org.apache.curator.framework.listen.MappingListenerManager;
+import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ThreadUtils;
 import org.slf4j.Logger;
@@ -71,7 +72,7 @@ public class ConnectionStateManager implements Closeable
     private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false);
     private final ExecutorService service;
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
-    private final MappingListenerContainer<ConnectionStateListener, ConnectionStateListener> listeners;
+    private final StandardListenerManager<ConnectionStateListener> listeners;
 
     // guarded by sync
     private ConnectionState currentConnectionState;
@@ -113,7 +114,7 @@ public class ConnectionStateManager implements Closeable
             threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
         }
         service = Executors.newSingleThreadExecutor(threadFactory);
-        listeners = MappingListenerContainer.mapping(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener));
+        listeners = MappingListenerManager.mappingStandard(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener));
     }
 
     /**
index e2daa96..37833b9 100644 (file)
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.state;
 
 import org.apache.curator.retry.RetryForever;
index 1712eed..5c80a9a 100644 (file)
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.state;
 
 import org.apache.curator.RetryPolicy;