GIRAPH-1036: Allow mappers to fail early on exceptions
authorMaja Kabiljo <majakabiljo@fb.com>
Wed, 21 Oct 2015 01:19:36 +0000 (18:19 -0700)
committerMaja Kabiljo <majakabiljo@fb.com>
Thu, 22 Oct 2015 03:06:04 +0000 (20:06 -0700)
Summary:
Often when something fails in a mapper we see it stuck until its timeout passes. Digging through this issue I found two root causes:
- Many threads we are creating were not daemon, preventing process to exit, only main thread should be daemon
- When calling submit on ExecutorService, exceptions are not propagated back to the caller, unless get is called on the future. In ProgressableUtils.getResultsWithNCallables we were calling get on one by one future, causing us to have to wait for previous futures to finish before getting exception which happened in later one.

Test Plan: Run jobs in which I simulated exceptions on some partitions in loading, compute and storing phases, for each verified we exit quickly with exception clearly shown, and without this change we'd wait for timeout and other threads from same ProgressableUtils.getResultsWithNCallables to finish. Run a normal job successfully. mvn clean verify

Differential Revision: https://reviews.facebook.net/D49143

giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java

index e820f26..04afba5 100644 (file)
  */
 package org.apache.giraph.comm.messages.queue;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import it.unimi.dsi.fastutil.ints.Int2IntArrayMap;
 import it.unimi.dsi.fastutil.ints.Int2IntMap;
 import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.utils.ThreadUtils;
 import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -60,8 +60,7 @@ public final class AsyncMessageStoreWrapper<I extends WritableComparable,
   /** Executor that processes messages in background */
   private static final ExecutorService EXECUTOR_SERVICE =
       Executors.newCachedThreadPool(
-          new ThreadFactoryBuilder().setDaemon(true)
-              .setNameFormat("AsyncMessageStoreWrapper-%d").build());
+          ThreadUtils.createThreadFactory("AsyncMessageStoreWrapper-%d"));
 
   /** Number of threads that will process messages in background */
   private final int threadsCount;
index 749e41e..d5b0e20 100644 (file)
 package org.apache.giraph.ooc;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.utils.LogStacktraceCallable;
+import org.apache.giraph.utils.ThreadUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
@@ -152,7 +152,7 @@ public class AdaptiveOutOfCoreEngine<I extends WritableComparable,
         }
       };
     checkMemoryExecutor = Executors.newSingleThreadExecutor(
-        new ThreadFactoryBuilder().setNameFormat("check-memory").build());
+        ThreadUtils.createThreadFactory("check-memory"));
     checkMemoryResult = checkMemoryExecutor.submit(new LogStacktraceCallable<>(
         checkMemoryCallableFactory.newCallable(0)));
 
@@ -164,9 +164,9 @@ public class AdaptiveOutOfCoreEngine<I extends WritableComparable,
               AdaptiveOutOfCoreEngine.this, serviceWorker);
         }
       };
-    outOfCoreProcessorExecutor = Executors
-        .newFixedThreadPool(numOocThreads,
-            new ThreadFactoryBuilder().setNameFormat("ooc-%d").build());
+    outOfCoreProcessorExecutor =
+        Executors.newFixedThreadPool(numOocThreads,
+            ThreadUtils.createThreadFactory("ooc-%d"));
     oocProcessorResults = Lists.newArrayListWithCapacity(numOocThreads);
     for (int i = 0; i < numOocThreads; ++i) {
       Future<Void> future = outOfCoreProcessorExecutor.submit(
index f90337f..4282d35 100644 (file)
@@ -94,6 +94,7 @@ public class JMapHistoDumper implements MasterObserver, WorkerObserver {
         }
       }
     });
+    thread.setDaemon(true);
     thread.start();
   }
 
index 08a7914..3008248 100644 (file)
@@ -20,14 +20,17 @@ package org.apache.giraph.utils;
 
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
+
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.group.ChannelGroupFuture;
 import io.netty.util.concurrent.EventExecutorGroup;
 
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -43,6 +46,11 @@ public class ProgressableUtils {
       Logger.getLogger(ProgressableUtils.class);
   /** Msecs to refresh the progress meter (one minute) */
   private static final int DEFUALT_MSEC_PERIOD = 60 * 1000;
+  /**
+   * When getting results with many threads, how many milliseconds to wait
+   * on each when looping through them
+   */
+  private static final int MSEC_TO_WAIT_ON_EACH_FUTURE = 10;
 
   /** Do not instantiate. */
   private ProgressableUtils() {
@@ -217,21 +225,42 @@ public class ProgressableUtils {
   public static <R> List<R> getResultsWithNCallables(
       CallableFactory<R> callableFactory, int numThreads,
       String threadNameFormat, Progressable progressable) {
-    ExecutorService executorService =
-        Executors.newFixedThreadPool(numThreads,
-            new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build());
-    List<Future<R>> futures = Lists.newArrayListWithCapacity(numThreads);
+    ExecutorService executorService = Executors.newFixedThreadPool(numThreads,
+        ThreadUtils.createThreadFactory(threadNameFormat));
+    HashMap<Integer, Future<R>> futures = new HashMap<>(numThreads);
     for (int i = 0; i < numThreads; i++) {
       Callable<R> callable = callableFactory.newCallable(i);
       Future<R> future = executorService.submit(
           new LogStacktraceCallable<R>(callable));
-      futures.add(future);
+      futures.put(i, future);
     }
     executorService.shutdown();
-    List<R> futureResults = Lists.newArrayListWithCapacity(numThreads);
-    for (Future<R> future : futures) {
-      R result = ProgressableUtils.getFutureResult(future, progressable);
-      futureResults.add(result);
+    List<R> futureResults =
+        new ArrayList<>(Collections.<R>nCopies(numThreads, null));
+    // Loop through the futures until all are finished
+    // We do this in order to get any exceptions from the futures early
+    while (!futures.isEmpty()) {
+      Iterator<Map.Entry<Integer, Future<R>>> iterator =
+          futures.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<Integer, Future<R>> entry = iterator.next();
+        R result;
+        try {
+          // Try to get result from the future
+          result = entry.getValue().get(
+              MSEC_TO_WAIT_ON_EACH_FUTURE, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException e) {
+          throw new IllegalStateException("Exception occurred", e);
+        } catch (TimeoutException e) {
+          // If result is not ready yet just keep waiting
+          continue;
+        }
+        // Result is ready, put it to final results
+        futureResults.set(entry.getKey(), result);
+        // Remove current future since we are done with it
+        iterator.remove();
+      }
+      progressable.progress();
     }
     return futureResults;
   }
index 844f929..f3ecfb0 100644 (file)
@@ -107,6 +107,7 @@ public class ReactiveJMapHistoDumper extends
       }
     });
     thread.setName("ReactiveJMapHistoDumperSupervisorThread");
+    thread.setDaemon(true);
     thread.start();
   }
 
index a235ff4..9518bdc 100644 (file)
@@ -45,10 +45,21 @@ public class ThreadUtils {
       String nameFormat,
       Thread.UncaughtExceptionHandler exceptionHandler) {
     ThreadFactoryBuilder builder = new ThreadFactoryBuilder().
-        setNameFormat(nameFormat);
+        setNameFormat(nameFormat).setDaemon(true);
     if (exceptionHandler != null) {
       builder.setUncaughtExceptionHandler(exceptionHandler);
     }
     return builder.build();
   }
+
+  /**
+   * Creates new thread factory with specified thread name format.
+   *
+   * @param nameFormat defines naming format for threads created by
+   *                   thread factory
+   * @return new thread factory with specified thread name format
+   */
+  public static ThreadFactory createThreadFactory(String nameFormat) {
+    return createThreadFactory(nameFormat, null);
+  }
 }
index dae9963..f37a48d 100644 (file)
@@ -62,7 +62,8 @@ public class WorkerProgressWriter {
           }
         }
       }
-    });
+    }, "workerProgressThread");
+    writerThread.setDaemon(true);
     writerThread.start();
   }