Fix CURATOR-462 -- return lease created in org.apache.curator.framework.recipes.locks...
authorkrajcsovszkig-ms <krajcsovszkig-ms@users.noreply.github.com>
Tue, 10 Apr 2018 12:08:45 +0000 (14:08 +0200)
committerkrajcsovszkig-ms <krajcsovszkig-ms@users.noreply.github.com>
Tue, 10 Apr 2018 12:08:45 +0000 (14:08 +0200)
curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java

index 7bc98f5..03e1088 100644 (file)
@@ -331,6 +331,7 @@ public class InterProcessSemaphoreV2
 
     static volatile CountDownLatch debugAcquireLatch = null;
     static volatile CountDownLatch debugFailedGetChildrenLatch = null;
+    volatile CountDownLatch debugWaitLatch = null;
 
     private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception
     {
@@ -353,6 +354,7 @@ public class InterProcessSemaphoreV2
         }
 
         Lease lease = null;
+        boolean success = false;
 
         try
         {
@@ -383,13 +385,11 @@ public class InterProcessSemaphoreV2
                             {
                                 debugFailedGetChildrenLatch.countDown();
                             }
-                            returnLease(lease); // otherwise the just created ZNode will be orphaned causing a dead lock
                             throw e;
                         }
                         if ( !children.contains(nodeName) )
                         {
                             log.error("Sequential path not found: " + path);
-                            returnLease(lease);
                             return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
                         }
     
@@ -402,20 +402,32 @@ public class InterProcessSemaphoreV2
                             long thisWaitMs = getThisWaitMs(startMs, waitMs);
                             if ( thisWaitMs <= 0 )
                             {
-                                returnLease(lease);
                                 return InternalAcquireResult.RETURN_NULL;
                             }
+                            if ( debugWaitLatch != null )
+                            {
+                                debugWaitLatch.countDown();
+                            }
                             wait(thisWaitMs);
                         }
                         else
                         {
+                            if ( debugWaitLatch != null )
+                            {
+                                debugWaitLatch.countDown();
+                            }
                             wait();
                         }
                     }
+                    success = true;
                 }
             }
             finally
             {
+                if ( !success )
+                {
+                    returnLease(lease);
+                }
                 client.removeWatchers();
             }
         }
index 73c76e8..50f6bce 100644 (file)
@@ -778,4 +778,53 @@ public class TestInterProcessSemaphore extends BaseClassForTests
             TestCleanState.closeAndTestClean(client);
         }
     }
+    
+    @Test
+    public void testInterruptAcquire() throws Exception
+    {
+        // CURATOR-462
+        final Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            final InterProcessSemaphoreV2 s1 = new InterProcessSemaphoreV2(client, "/test", 1);
+            final InterProcessSemaphoreV2 s2 = new InterProcessSemaphoreV2(client, "/test", 1);
+            final InterProcessSemaphoreV2 s3 = new InterProcessSemaphoreV2(client, "/test", 1);
+            
+            final CountDownLatch debugWaitLatch = s2.debugWaitLatch = new CountDownLatch(1);
+            
+            // Acquire exclusive semaphore
+            Lease lease = s1.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS);
+            Assert.assertNotNull(lease);
+            
+            // Queue up another semaphore on the same path
+            Future<Object> handle = Executors.newSingleThreadExecutor().submit(new Callable<Object>() {
+                
+                @Override
+                public Object call() throws Exception {
+                    s2.acquire();
+                    return null;
+                }
+            });
+
+            // Wait until second lease is created and the wait is started for it to become active
+            Assert.assertTrue(timing.awaitLatch(debugWaitLatch));
+            
+            // Interrupt the wait
+            handle.cancel(true);
+            
+            // Assert that the second lease is gone
+            timing.sleepABit();
+            Assert.assertEquals(client.getChildren().forPath("/test/leases").size(), 1);
+            
+            // Assert that after closing the first (current) semaphore, we can acquire a new one
+            s1.returnLease(lease);
+            Assert.assertNotNull(s3.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
 }