SAMZA-1021 : Remove the redundent poll waiting inside AsyncRunLoop blockIfBusy
authorXinyu Liu <xiliu@linkedin.com>
Thu, 22 Sep 2016 18:42:13 +0000 (11:42 -0700)
committervjagadish1989 <jvenkatr@linkedin.com>
Thu, 22 Sep 2016 18:46:51 +0000 (11:46 -0700)
samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java

index a510bb0..9a21bf1 100644 (file)
@@ -201,32 +201,23 @@ public class AsyncRunLoop implements Runnable {
   }
 
   /**
-   * Block the runloop thread if all tasks are busy. Due to limitation of non-blocking for the flow control,
-   * we block the run loop when there are no runnable tasks, or all tasks are idle (no pending messages) while
-   * chooser is empty too. When a task worker finishes or window/commit completes, it will resume the runloop.
+   * Block the runloop thread if all tasks are busy. When a task worker finishes or window/commit completes,
+   * it will resume the runloop.
    */
   private void blockIfBusy(IncomingMessageEnvelope envelope) {
     synchronized (latch) {
       while (!shutdownNow && throwable == null) {
         for (AsyncTaskWorker worker : taskWorkers.values()) {
-          if (worker.state.isReady() && (envelope != null || worker.state.hasPendingOps())) {
-            // should continue running since the worker state is ready and there is either new message
-            // or some pending operations for the worker
+          if (worker.state.isReady()) {
+            // should continue running if any worker state is ready
+            // consumerMultiplexer will block on polling for empty partitions so it won't cause busy loop
             return;
           }
         }
 
         try {
           log.trace("Block loop thread");
-
-          if (envelope == null) {
-            // If the envelope is null then we will wait for a poll interval, otherwise next choose() will
-            // return null immediately and we will have a busy loop
-            latch.wait(consumerMultiplexer.pollIntervalMs());
-            return;
-          } else {
-            latch.wait();
-          }
+          latch.wait();
         } catch (InterruptedException e) {
           throw new SamzaException("Run loop is interrupted", e);
         }
@@ -531,10 +522,6 @@ public class AsyncRunLoop implements Runnable {
       }
     }
 
-    private boolean hasPendingOps() {
-      return !pendingEnvelopQueue.isEmpty() || needCommit || needWindow;
-    }
-
     /**
      * Returns the next operation by this taskWorker
      */
@@ -616,4 +603,4 @@ public class AsyncRunLoop implements Runnable {
       return pendingEnvelope.envelope;
     }
   }
-}
\ No newline at end of file
+}