CURATOR-495
authorrandgalt <randgalt@apache.org>
Fri, 14 Dec 2018 01:34:40 +0000 (20:34 -0500)
committerrandgalt <randgalt@apache.org>
Fri, 14 Dec 2018 01:34:40 +0000 (20:34 -0500)
Fixes race in many Curator recipes whereby a pattern was used that called "notifyAll" in a synchronized block in response to a ZooKeeper watcher callback. This created a race and possible deadlock if the recipe instance was already in a synchronized block. This would result in the ZK event thread getting blocked which would prevent ZK connections from getting repaired. This change adds a new executor (available from CuratorFramework) that can be used to do the sync/notify so that ZK's event thread is not blocked.

curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java

index bf6167c..02c458a 100644 (file)
@@ -25,16 +25,14 @@ import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
 import org.apache.curator.framework.api.transaction.CuratorOp;
 import org.apache.curator.framework.api.transaction.CuratorTransaction;
 import org.apache.curator.framework.api.transaction.TransactionOp;
-import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.schema.SchemaSet;
-import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
+import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
-
 import java.io.Closeable;
 import java.util.concurrent.TimeUnit;
 
@@ -269,7 +267,7 @@ public interface CuratorFramework extends Closeable
      * Call this method on watchers you are no longer interested in.
      *
      * @param watcher the watcher
-     * 
+     *
      * @deprecated As of ZooKeeper 3.5 Curators recipes will handle removing watcher references
      * when they are no longer used. If you write your own recipe, follow the example of Curator
      * recipes and use {@link #newWatcherRemoveCuratorFramework} calling {@link WatcherRemoveCuratorFramework#removeWatchers()}
@@ -277,7 +275,7 @@ public interface CuratorFramework extends Closeable
      */
     @Deprecated
     public void clearWatcherReferences(Watcher watcher);
-        
+
     /**
      * Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded
      * @param maxWaitTime The maximum wait time. Specify a value &lt;= 0 to wait indefinitely
@@ -286,7 +284,7 @@ public interface CuratorFramework extends Closeable
      * @throws InterruptedException If interrupted while waiting
      */
     public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException;
-    
+
     /**
      * Block until a connection to ZooKeeper is available. This method will not return until a
      * connection is available or it is interrupted, in which case an InterruptedException will
@@ -331,4 +329,29 @@ public interface CuratorFramework extends Closeable
      * @return true/false
      */
     boolean isZk34CompatibilityMode();
+
+    /**
+     * Calls {@link #notifyAll()} on the given object after first synchronizing on it. This is
+     * done from the {@link #runSafe(Runnable)} thread.
+     *
+     * @param monitorHolder object to sync on and notify
+     * @since 4.1.0
+     */
+    default void postSafeNotify(Object monitorHolder)
+    {
+        runSafe(() -> {
+            synchronized(monitorHolder) {
+                monitorHolder.notifyAll();
+            }
+        });
+    }
+
+    /**
+     * Curator (and user) recipes can use this to run notifyAll
+     * and other blocking calls that might normally block ZooKeeper's event thread.
+
+     * @param runnable proc to call from a safe internal thread
+     * @since 4.1.0
+     */
+    void runSafe(Runnable runnable);
 }
index f3daeab..86fbfce 100644 (file)
@@ -47,6 +47,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.curator.CuratorZookeeperClient;
@@ -149,6 +150,8 @@ public class CuratorFrameworkFactory
         private SchemaSet schemaSet = SchemaSet.getDefaultSchemaSet();
         private boolean zk34CompatibilityMode = isZK34();
         private int waitForShutdownTimeoutMs = 0;
+        private Executor runSafeService = null;
+
         /**
          * Apply the current values and build a new CuratorFramework
          *
@@ -189,7 +192,7 @@ public class CuratorFrameworkFactory
 
         /**
          * Add connection authorization
-         * 
+         *
          * Subsequent calls to this method overwrite the prior calls.
          *
          * @param scheme the scheme
@@ -474,6 +477,28 @@ public class CuratorFrameworkFactory
             return this;
         }
 
+        /**
+         * Curator (and user) recipes will use this executor to call notifyAll
+         * and other blocking calls that might normally block ZooKeeper's event thread.
+         * By default, an executor is allocated internally using the provided (or default)
+         * {@link #threadFactory(java.util.concurrent.ThreadFactory)}. Use this method
+         * to set a custom executor.
+         *
+         * @param runSafeService executor to use for calls to notifyAll from Watcher callbacks etc
+         * @return this
+         * @since 4.1.0
+         */
+        public Builder runSafeService(Executor runSafeService)
+        {
+            this.runSafeService = runSafeService;
+            return null;
+        }
+
+        public Executor getRunSafeService()
+        {
+            return runSafeService;
+        }
+
         public ACLProvider getAclProvider()
         {
             return aclProvider;
index 1ae6a5e..34002a0 100644 (file)
@@ -61,6 +61,7 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -96,6 +97,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final EnsembleTracker ensembleTracker;
     private final SchemaSet schemaSet;
     private final boolean zk34CompatibilityMode;
+    private final Executor runSafeService;
 
     private volatile ExecutorService executorService;
     private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -163,6 +165,22 @@ public class CuratorFrameworkImpl implements CuratorFramework
         namespaceFacadeCache = new NamespaceFacadeCache(this);
 
         ensembleTracker = zk34CompatibilityMode ? null : new EnsembleTracker(this, builder.getEnsembleProvider());
+
+        runSafeService = makeRunSafeService(builder);
+    }
+
+    private Executor makeRunSafeService(CuratorFrameworkFactory.Builder builder)
+    {
+        if ( builder.getRunSafeService() != null )
+        {
+            return builder.getRunSafeService();
+        }
+        ThreadFactory threadFactory = builder.getThreadFactory();
+        if ( threadFactory == null )
+        {
+            threadFactory = ThreadUtils.newThreadFactory("SafeNotifyService");
+        }
+        return Executors.newSingleThreadExecutor(threadFactory);
     }
 
     private List<AuthInfo> buildAuths(CuratorFrameworkFactory.Builder builder)
@@ -176,6 +194,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
     }
 
     @Override
+    public void runSafe(Runnable runnable)
+    {
+        runSafeService.execute(runnable);
+    }
+
+    @Override
     public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework()
     {
         return new WatcherRemovalFacade(this);
@@ -240,6 +264,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         schemaSet = parent.schemaSet;
         zk34CompatibilityMode = parent.zk34CompatibilityMode;
         ensembleTracker = null;
+        runSafeService = parent.runSafeService;
     }
 
     @Override
index 8a376f1..fb00ed9 100644 (file)
@@ -44,7 +44,7 @@ public class DistributedBarrier
         @Override
         public void process(WatchedEvent event)
         {
-            notifyFromWatcher();
+            client.postSafeNotify(DistributedBarrier.this);
         }
     };
 
@@ -142,9 +142,4 @@ public class DistributedBarrier
         }
         return result;
     }
-
-    private synchronized void notifyFromWatcher()
-    {
-        notifyAll();
-    }
 }
index b3bdf2c..2315178 100644 (file)
@@ -65,7 +65,12 @@ public class DistributedDoubleBarrier
         public void process(WatchedEvent event)
         {
             connectionLost.set(event.getState() != Event.KeeperState.SyncConnected);
-            notifyFromWatcher();
+            client.runSafe(() -> {
+                synchronized(DistributedDoubleBarrier.this) {
+                    hasBeenNotified.set(true);
+                    DistributedDoubleBarrier.this.notifyAll();
+                }
+            });
         }
     };
 
@@ -337,10 +342,4 @@ public class DistributedDoubleBarrier
 
         return result;
     }
-
-    private synchronized void notifyFromWatcher()
-    {
-        hasBeenNotified.set(true);
-        notifyAll();
-    }
 }
index 03e1088..6404888 100644 (file)
@@ -40,7 +40,6 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
@@ -88,7 +87,7 @@ public class InterProcessSemaphoreV2
         @Override
         public void process(WatchedEvent event)
         {
-            notifyFromWatcher();
+            client.postSafeNotify(InterProcessSemaphoreV2.this);
         }
     };
 
@@ -141,7 +140,7 @@ public class InterProcessSemaphoreV2
                         public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
                         {
                             InterProcessSemaphoreV2.this.maxLeases = newCount;
-                            notifyFromWatcher();
+                            client.postSafeNotify(InterProcessSemaphoreV2.this);
                         }
 
                         @Override
@@ -373,7 +372,7 @@ public class InterProcessSemaphoreV2
                 synchronized(this)
                 {
                     for(;;)
-                    {    
+                    {
                         List<String> children;
                         try
                         {
@@ -392,7 +391,7 @@ public class InterProcessSemaphoreV2
                             log.error("Sequential path not found: " + path);
                             return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
                         }
-    
+
                         if ( children.size() <= maxLeases )
                         {
                             break;
@@ -479,9 +478,4 @@ public class InterProcessSemaphoreV2
             }
         };
     }
-
-    private synchronized void notifyFromWatcher()
-    {
-        notifyAll();
-    }
 }
index 46820af..a22bfb1 100644 (file)
@@ -66,7 +66,7 @@ public class LockInternals
         @Override
         public void process(WatchedEvent event)
         {
-            notifyFromWatcher();
+            client.postSafeNotify(LockInternals.this);
         }
     };
 
@@ -295,7 +295,7 @@ public class LockInternals
 
                     synchronized(this)
                     {
-                        try 
+                        try
                         {
                             // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                             client.getData().usingWatcher(watcher).forPath(previousSequencePath);
@@ -316,7 +316,7 @@ public class LockInternals
                                 wait();
                             }
                         }
-                        catch ( KeeperException.NoNodeException e ) 
+                        catch ( KeeperException.NoNodeException e )
                         {
                             // it has been deleted (i.e. lock released). Try to acquire again
                         }
@@ -351,9 +351,4 @@ public class LockInternals
             // ignore - already deleted (possibly expired session, etc.)
         }
     }
-
-    private synchronized void notifyFromWatcher()
-    {
-        notifyAll();
-    }
 }