SAMZA-1741: fix issue that EH consumer taking too long to shutdown
authorHai Lu <halu@linkedin.com>
Fri, 8 Jun 2018 17:05:36 +0000 (10:05 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Fri, 8 Jun 2018 17:05:36 +0000 (10:05 -0700)
1.  lower the shutdown timeout from 1 min to 15 seconds
2. make sure EventHubManagers are shutdown in parallel
3. print a thread dump when we do fail during shutdown

Author: Hai Lu <halu@linkedin.com>

Reviewers: Jagadish <jagadish@apache.org>, Prateek <pmaheshw@linkedin.com>

Closes #548 from lhaiesp/master

samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
samza-core/src/main/java/org/apache/samza/util/ShutdownUtil.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/util/TestShutdownUtil.java [new file with mode: 0644]

index 04e361f..3fa95c2 100644 (file)
@@ -28,17 +28,13 @@ import com.microsoft.azure.eventhubs.PartitionReceiver;
 import com.microsoft.azure.eventhubs.impl.ClientConstants;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -57,6 +53,7 @@ import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin;
 import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
 import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
 import org.apache.samza.util.BlockingEnvelopeMap;
+import org.apache.samza.util.ShutdownUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,7 +99,7 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
 
   // Overall timeout for EventHubClient exponential backoff policy
   private static final Duration DEFAULT_EVENTHUB_RECEIVER_TIMEOUT = Duration.ofMinutes(10L);
-  private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
+  private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofSeconds(15).toMillis();
 
   public static final String START_OF_STREAM = ClientConstants.START_OF_STREAM; // -1
   public static final String END_OF_STREAM = "-2";
@@ -352,17 +349,32 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
   @Override
   public void stop() {
     LOG.info("Stopping event hub system consumer...");
-    List<CompletableFuture<Void>> futures = new ArrayList<>();
-    streamPartitionReceivers.values().forEach((receiver) -> futures.add(receiver.close()));
-    CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
-    try {
-      future.get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
-    } catch (ExecutionException | InterruptedException | TimeoutException e) {
-      LOG.warn("Failed to close receivers", e);
-    }
-    perPartitionEventHubManagers.values()
-        .parallelStream()
-        .forEach(ehClientManager -> ehClientManager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
+
+    // There could be potentially many Receivers and EventHubManagers, so close the managers in parallel
+    LOG.info("Start shutting down eventhubs receivers");
+    ShutdownUtil.boundedShutdown(streamPartitionReceivers.values().stream().map(receiver -> new Runnable() {
+      @Override
+      public void run() {
+        try {
+          receiver.close().get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+          LOG.error("Failed to shutdown receiver.", e);
+        }
+      }
+    }).collect(Collectors.toList()), "EventHubSystemConsumer.Receiver#close", DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
+
+    LOG.info("Start shutting down eventhubs managers");
+    ShutdownUtil.boundedShutdown(perPartitionEventHubManagers.values().stream().map(manager -> new Runnable() {
+      @Override
+      public void run() {
+        try {
+          manager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
+        } catch (Exception e) {
+          LOG.error("Failed to shutdown eventhubs manager.", e);
+        }
+      }
+    }).collect(Collectors.toList()), "EventHubSystemConsumer.ClientManager#close", DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
+
     perPartitionEventHubManagers.clear();
     perStreamEventHubManagers.clear();
     isStarted = false;
diff --git a/samza-core/src/main/java/org/apache/samza/util/ShutdownUtil.java b/samza-core/src/main/java/org/apache/samza/util/ShutdownUtil.java
new file mode 100644 (file)
index 0000000..3d75654
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+/**
+ * Shutdown related utils
+ */
+public class ShutdownUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(ShutdownUtil.class);
+
+  /**
+   * A helper to facilitate shutting down a set of resources in parallel to enforce a bounded shutdown time.
+   * The helper function instantiates an {@link ExecutorService} to execute a list of shutdown tasks, and will
+   * await the termination for given timeout. If shutdown remains unfinished in the end, the whole thread dump
+   * will be printed to help debugging.
+   *
+   * The shutdown is performed with best-effort. Depending on the implementation of the shutdown function, resource
+   * leak might be possible.
+   *
+   * @param shutdownTasks the list of shutdown tasks that need to be executed in parallel
+   * @param message message that will show in the thread name and the thread dump
+   * @param timeoutMs timeout in ms
+   * @return true if all tasks terminate in the end
+   */
+  public static boolean boundedShutdown(List<Runnable> shutdownTasks, String message, long timeoutMs) {
+    ExecutorService shutdownExecutorService = Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setNameFormat(message + "-%d").setDaemon(true).build());
+    shutdownTasks.forEach(shutdownExecutorService::submit);
+    shutdownExecutorService.shutdown();
+    try {
+      shutdownExecutorService.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      LOG.error("Shutdown was interrupted for " + message, e);
+    }
+
+    if (shutdownExecutorService.isTerminated()) {
+      LOG.info("Shutdown complete for {}", message);
+      return true;
+    } else {
+      LOG.error("Shutdown function for {} remains unfinished after timeout({}ms) or interruption", message, timeoutMs);
+      Util.logThreadDump(message);
+      shutdownExecutorService.shutdownNow();
+      return false;
+    }
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestShutdownUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestShutdownUtil.java
new file mode 100644 (file)
index 0000000..d02619a
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import java.time.Duration;
+import java.util.Collections;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestShutdownUtil {
+  @Test
+  public void testBoundedShutdown() throws Exception {
+    long longTimeout = Duration.ofSeconds(60).toMillis();
+    long shortTimeout = Duration.ofMillis(100).toMillis();
+
+    Runnable shortRunnable = () -> {
+      try {
+        Thread.sleep(shortTimeout);
+      } catch (Exception e) {
+        Assert.fail(e.getMessage());
+      }
+    };
+    long start = System.currentTimeMillis();
+    Assert.assertTrue("expect the shutdown task to terminate",
+        ShutdownUtil.boundedShutdown(Collections.singletonList(shortRunnable), "testLongTimeout", longTimeout));
+    long end = System.currentTimeMillis();
+    Assert.assertTrue("boundedShutdown should complete if the shutdown function completes earlier",
+        (end - start) < longTimeout / 2);
+
+    Runnable longRunnable = () -> {
+      try {
+        Thread.sleep(longTimeout);
+      } catch (Exception e) {
+        Assert.fail(e.getMessage());
+      }
+    };
+    start = System.currentTimeMillis();
+    Assert.assertFalse("expect the shutdown task to be unfinished",
+        ShutdownUtil.boundedShutdown(Collections.singletonList(longRunnable), "testShortTimeout", shortTimeout));
+    end = System.currentTimeMillis();
+    Assert.assertTrue("boundedShutdown should complete even if the shutdown function takes long time",
+        (end - start) < longTimeout / 2);
+  }
+}