Minor: Log full thread stacks in thread dumps.
authorPrateek Maheshwari <pmaheshwari@apache.org>
Wed, 12 Dec 2018 21:41:48 +0000 (13:41 -0800)
committerPrateek Maheshwari <pmaheshwari@apache.org>
Wed, 12 Dec 2018 21:41:48 +0000 (13:41 -0800)
Author: Prateek Maheshwari <pmaheshwari@apache.org>

Reviewers: Cameron Lee <calee@linkedin.com>, Jagadish Venkatraman <vjagadish1989@gmail.com>

Closes #855 from prateekm/detailed-thread-dump

samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
samza-core/src/main/java/org/apache/samza/util/SamzaUncaughtExceptionHandler.java
samza-core/src/main/java/org/apache/samza/util/ShutdownUtil.java
samza-core/src/main/java/org/apache/samza/util/ThreadUtil.java [new file with mode: 0644]
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/util/Util.scala

index 09f68f2..55fa8da 100644 (file)
@@ -25,7 +25,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.samza.util.Util;
+import org.apache.samza.util.ThreadUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +57,7 @@ public class ContainerHeartbeatMonitor {
           scheduler.schedule(() -> {
               // On timeout of container shutting down, force exit.
               LOG.error("Graceful shutdown timeout expired. Force exiting.");
-              Util.logThreadDump("Thread dump at heartbeat monitor shutdown timeout.");
+              ThreadUtil.logThreadDump("Thread dump at heartbeat monitor shutdown timeout.");
               System.exit(1);
             }, SHUTDOWN_TIMOUT_MS, TimeUnit.MILLISECONDS);
           onContainerExpired.run();
index 36e39ba..9d87e6e 100644 (file)
@@ -32,7 +32,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.util.HighResolutionClock;
-import org.apache.samza.util.Util;
+import org.apache.samza.util.ThreadUtil;
 
 
 /**
@@ -101,7 +101,7 @@ class TaskCallbackManager {
       Runnable timerTask = new Runnable() {
         @Override
         public void run() {
-          Util.logThreadDump("Thread dump at task callback timeout");
+          ThreadUtil.logThreadDump("Thread dump at task callback timeout");
           String msg = "Callback for task {} " + callback.taskName + " timed out after " + timeout + " ms.";
           callback.failure(new SamzaException(msg));
         }
index d94b2d9..5f60c51 100644 (file)
@@ -60,7 +60,7 @@ public class SamzaUncaughtExceptionHandler implements UncaughtExceptionHandler {
     System.err.println(msg);
     e.printStackTrace(System.err);
     try {
-      Util.logThreadDump("Thread dump from uncaught exception handler.");
+      ThreadUtil.logThreadDump("Thread dump from uncaught exception handler.");
       runnable.run();
     } catch (Throwable throwable) {
       // Ignore to avoid further exception propagation
index 3d75654..f947e05 100644 (file)
@@ -65,8 +65,8 @@ public class ShutdownUtil {
       LOG.info("Shutdown complete for {}", message);
       return true;
     } else {
-      LOG.error("Shutdown function for {} remains unfinished after timeout({}ms) or interruption", message, timeoutMs);
-      Util.logThreadDump(message);
+      LOG.error("Shutdown function for {} remains unfinished after timeout ({} ms) or interruption", message, timeoutMs);
+      ThreadUtil.logThreadDump(message);
       shutdownExecutorService.shutdownNow();
       return false;
     }
diff --git a/samza-core/src/main/java/org/apache/samza/util/ThreadUtil.java b/samza-core/src/main/java/org/apache/samza/util/ThreadUtil.java
new file mode 100644 (file)
index 0000000..0164abb
--- /dev/null
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import java.lang.management.LockInfo;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MonitorInfo;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThreadUtil {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ThreadUtil.class);
+  private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean();
+
+  public static void logThreadDump(String message) {
+    try {
+      ThreadInfo[] threadInfo = THREAD_MX_BEAN.dumpAllThreads(true, true);
+      StringBuilder sb = new StringBuilder();
+      sb.append(message).append("\n");
+      for (ThreadInfo ti: threadInfo) {
+        sb.append(toString(ti)).append("\n");
+      }
+      LOGGER.info(sb.toString());
+    } catch (Exception e) {
+      LOGGER.error("Could not get and log a thread dump.", e);
+    }
+  }
+
+  /**
+   * Copy of ThreadInfo#toString() without the hardcoded MAX_FRAMES = 8 restriction on thread stack depth.
+   *
+   * Returns a string representation of this thread info.
+   * The format of this string depends on the implementation.
+   * The returned string will typically include
+   * the thread name, the getThreadId thread ID, its state,
+   * and a stack trace if any.
+   *
+   * @return a string representation of this thread info.
+   */
+  private static String toString(ThreadInfo info) {
+    StringBuilder sb = new StringBuilder("\"" + info.getThreadName() + "\""
+        + " Id=" + info.getThreadId() + " " + info.getThreadState());
+    if (info.getLockName() != null) {
+      sb.append(" on " + info.getLockName());
+    }
+    if (info.getLockOwnerName() != null) {
+      sb.append(" owned by \"" + info.getLockOwnerName() + "\" Id="
+          + info.getLockOwnerId());
+    }
+    if (info.isSuspended()) {
+      sb.append(" (suspended)");
+    }
+    if (info.isInNative()) {
+      sb.append(" (in native)");
+    }
+    sb.append('\n');
+    int i = 0;
+    for (; i < info.getStackTrace().length; i++) {
+      StackTraceElement ste = info.getStackTrace()[i];
+      sb.append("\tat " + ste.toString());
+      sb.append('\n');
+      if (i == 0 && info.getLockInfo() != null) {
+        Thread.State ts = info.getThreadState();
+        switch (ts) {
+          case BLOCKED:
+            sb.append("\t-  blocked on " + info.getLockInfo());
+            sb.append('\n');
+            break;
+          case WAITING:
+            sb.append("\t-  waiting on " + info.getLockInfo());
+            sb.append('\n');
+            break;
+          case TIMED_WAITING:
+            sb.append("\t-  waiting on " + info.getLockInfo());
+            sb.append('\n');
+            break;
+          default:
+        }
+      }
+
+      for (MonitorInfo mi : info.getLockedMonitors()) {
+        if (mi.getLockedStackDepth() == i) {
+          sb.append("\t-  locked " + mi);
+          sb.append('\n');
+        }
+      }
+    }
+    if (i < info.getStackTrace().length) {
+      sb.append("\t...");
+      sb.append('\n');
+    }
+
+    LockInfo[] locks = info.getLockedSynchronizers();
+    if (locks.length > 0) {
+      sb.append("\n\tNumber of locked synchronizers = " + locks.length);
+      sb.append('\n');
+      for (LockInfo li : locks) {
+        sb.append("\t- " + li);
+        sb.append('\n');
+      }
+    }
+    sb.append('\n');
+    return sb.toString();
+  }
+}
index e163d7d..4aa2ed8 100644 (file)
@@ -1091,7 +1091,7 @@ class SamzaContainer(
           info("Shutdown complete")
         } else {
           error("Did not shut down within %s ms, exiting." format shutdownMs)
-          Util.logThreadDump("Thread dump from Samza Container Shutdown Hook.")
+          ThreadUtil.logThreadDump("Thread dump from Samza Container Shutdown Hook.")
         }
       }
     }
index 53b7d19..24ee476 100644 (file)
@@ -23,7 +23,6 @@ package org.apache.samza.util
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config._
 import org.apache.samza.SamzaException
-import java.lang.management.ManagementFactory
 import java.net.Inet4Address
 import java.net.InetAddress
 import java.net.NetworkInterface
@@ -34,7 +33,6 @@ import scala.collection.JavaConverters._
 
 object Util extends Logging {
   val Random = new Random
-  val ThreadMxBean = ManagementFactory.getThreadMXBean
 
   /**
    * Make an environment variable string safe to pass.
@@ -114,19 +112,4 @@ object Util extends Logging {
       case _ => config
     }
   }
-
-  def logThreadDump(message: String): Unit = {
-    try {
-      val threadInfo = ThreadMxBean.dumpAllThreads(true, true)
-      val sb = new StringBuilder
-      sb.append(message).append("\n")
-      for (ti <- threadInfo) {
-        sb.append(ti.toString).append("\n")
-      }
-      info(sb)
-    } catch {
-      case e: Exception =>
-        info("Could not get and log a thread dump", e)
-    }
-  }
 }