CURATOR-505 - Some refactoring and more doc
authorrandgalt <randgalt@apache.org>
Mon, 11 Feb 2019 12:46:55 +0000 (07:46 -0500)
committerrandgalt <randgalt@apache.org>
Mon, 11 Feb 2019 12:46:55 +0000 (07:46 -0500)
curator-framework/src/main/java/org/apache/curator/framework/listen/MappingListenerManager.java
curator-framework/src/main/java/org/apache/curator/framework/listen/StandardListenerManager.java
curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java

index f230da9..bd9f51a 100644 (file)
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.function.UnaryOperator;
 
 /**
  * Upgraded version of {@link org.apache.curator.framework.listen.ListenerContainer} that
@@ -39,18 +38,6 @@ public class MappingListenerManager<K, V> implements ListenerManager<K, V>
     private final Function<K, V> mapper;
 
     /**
-     * Returns a new mapping container that maps to the same type
-     *
-     * @param mapper listener mapper/wrapper
-     * @return new container
-     */
-    public static <T> StandardListenerManager<T> mappingStandard(UnaryOperator<T> mapper)
-    {
-        MappingListenerManager<T, T> container = new MappingListenerManager<>(mapper);
-        return new StandardListenerManager<>(container);
-    }
-
-    /**
      * Returns a new container that wraps listeners using the given mapper
      *
      * @param mapper listener mapper/wrapper
index 8d239ca..e07fe47 100644 (file)
  */
 package org.apache.curator.framework.listen;
 
-import java.util.Objects;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.UnaryOperator;
 
 /**
  * Non mapping version of a listener container
@@ -41,6 +41,18 @@ public class StandardListenerManager<T> implements ListenerManager<T, T>
         return new StandardListenerManager<>(container);
     }
 
+    /**
+     * Returns a new mapping container that maps to the same type
+     *
+     * @param mapper listener mapper/wrapper
+     * @return new container
+     */
+    public static <T> StandardListenerManager<T> mappingStandard(UnaryOperator<T> mapper)
+    {
+        MappingListenerManager<T, T> container = new MappingListenerManager<>(mapper);
+        return new StandardListenerManager<>(container);
+    }
+
     @Override
     public void addListener(T listener)
     {
index 583b9f2..55e17c8 100644 (file)
@@ -22,7 +22,6 @@ package org.apache.curator.framework.state;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.listen.MappingListenerManager;
 import org.apache.curator.framework.listen.StandardListenerManager;
 import org.apache.curator.utils.Compatibility;
 import org.apache.curator.utils.ThreadUtils;
@@ -114,7 +113,7 @@ public class ConnectionStateManager implements Closeable
             threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
         }
         service = Executors.newSingleThreadExecutor(threadFactory);
-        listeners = MappingListenerManager.mappingStandard(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener));
+        listeners = StandardListenerManager.mappingStandard(listener -> listener.doNotDecorate() ? listener : connectionStateListenerDecorator.decorateListener(client, listener));
     }
 
     /**
index cd87125..8dae57b 100644 (file)
 package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.collect.Queues;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionStateListenerDecorator;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.retry.RetryForever;
 import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class TestPathChildrenCacheInCluster extends BaseClassForTests
 {
+    @Override
+    protected void createServer()
+    {
+        // do nothing
+    }
+
+    @Test
+    public void testWithCircuitBreaker() throws Exception
+    {
+        Timing timing = new Timing();
+        try ( TestingCluster cluster = new TestingCluster(3) )
+        {
+            cluster.start();
+
+            ConnectionStateListenerDecorator decorator = ConnectionStateListenerDecorator.circuitBreaking(new RetryForever(timing.multiple(2).milliseconds()));
+            Iterator<InstanceSpec> iterator = cluster.getInstances().iterator();
+            InstanceSpec client1Instance = iterator.next();
+            InstanceSpec client2Instance = iterator.next();
+            ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(100, 3);
+            try (CuratorFramework client1 = CuratorFrameworkFactory.
+                builder()
+                .connectString(client1Instance.getConnectString())
+                .retryPolicy(exponentialBackoffRetry)
+                .sessionTimeoutMs(timing.session())
+                .connectionTimeoutMs(timing.connection())
+                .connectionStateListenerDecorator(decorator)
+                .build()
+            )
+            {
+                client1.start();
+
+                try ( CuratorFramework client2 = CuratorFrameworkFactory.newClient(client2Instance.getConnectString(), timing.session(), timing.connection(), exponentialBackoffRetry) )
+                {
+                    client2.start();
+
+                    AtomicInteger refreshCount = new AtomicInteger(0);
+                    try ( PathChildrenCache cache = new PathChildrenCache(client1, "/test", true) {
+                        @Override
+                        void refresh(RefreshMode mode) throws Exception
+                        {
+                            refreshCount.incrementAndGet();
+                            super.refresh(mode);
+                        }
+                    } )
+                    {
+                        cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+
+                        client2.create().forPath("/test/1", "one".getBytes());
+                        client2.create().forPath("/test/2", "two".getBytes());
+                        client2.create().forPath("/test/3", "three".getBytes());
+
+                        Future<?> task = Executors.newSingleThreadExecutor().submit(() -> {
+                            try
+                            {
+                                for ( int i = 0; i < 5; ++i )
+                                {
+                                    cluster.killServer(client1Instance);
+                                    cluster.restartServer(client1Instance);
+                                    timing.sleepABit();
+                                }
+                            }
+                            catch ( Exception e )
+                            {
+                                e.printStackTrace();
+                            }
+                        });
+
+                        client2.create().forPath("/test/4", "four".getBytes());
+                        client2.create().forPath("/test/5", "five".getBytes());
+                        client2.delete().forPath("/test/4");
+                        client2.setData().forPath("/test/1", "1".getBytes());
+                        client2.create().forPath("/test/6", "six".getBytes());
+
+                        task.get();
+                        timing.sleepABit();
+
+                        Assert.assertNotNull(cache.getCurrentData("/test/1"));
+                        Assert.assertEquals(cache.getCurrentData("/test/1").getData(), "1".getBytes());
+                        Assert.assertNotNull(cache.getCurrentData("/test/2"));
+                        Assert.assertEquals(cache.getCurrentData("/test/2").getData(), "two".getBytes());
+                        Assert.assertNotNull(cache.getCurrentData("/test/3"));
+                        Assert.assertEquals(cache.getCurrentData("/test/3").getData(), "three".getBytes());
+                        Assert.assertNull(cache.getCurrentData("/test/4"));
+                        Assert.assertNotNull(cache.getCurrentData("/test/5"));
+                        Assert.assertEquals(cache.getCurrentData("/test/5").getData(), "five".getBytes());
+                        Assert.assertNotNull(cache.getCurrentData("/test/6"));
+                        Assert.assertEquals(cache.getCurrentData("/test/6").getData(), "six".getBytes());
+
+                        Assert.assertEquals(refreshCount.get(), 2);
+                    }
+                }
+            }
+        }
+    }
+
     @Test(enabled = false)  // this test is very flakey - it needs to be re-written at some point
     public void testMissedDelete() throws Exception
     {
index 9ae6a5d..f932ae4 100644 (file)
@@ -98,6 +98,11 @@ public class BaseClassForTests
         System.setProperty(INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND, "true");
         System.setProperty(INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY, "true");
 
+        createServer();
+    }
+
+    protected void createServer() throws Exception
+    {
         while ( server == null )
         {
             try