SAMZA-866: Refactor container allocator classes
authorJacob Maes <jacob.maes@gmail.com>
Wed, 10 Feb 2016 07:30:21 +0000 (23:30 -0800)
committerYi Pan (Data Infrastructure) <nickpan47@gmail.com>
Wed, 10 Feb 2016 07:30:21 +0000 (23:30 -0800)
samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java
samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java

index 9ee2dac..2e192ee 100644 (file)
@@ -24,6 +24,9 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.samza.config.YarnConfig;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /**
  * This class is responsible for making requests for containers to the AM and also, assigning a container to run on an allocated resource.
@@ -34,6 +37,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * See {@link org.apache.samza.job.yarn.ContainerAllocator} and {@link org.apache.samza.job.yarn.HostAwareContainerAllocator}
  */
 public abstract class AbstractContainerAllocator implements Runnable {
+  private static final Logger log = LoggerFactory.getLogger(AbstractContainerAllocator.class);
+
   public static final String ANY_HOST = ContainerRequestState.ANY_HOST;
   public static final int DEFAULT_PRIORITY = 0;
   public static final int DEFAULT_CONTAINER_MEM = 1024;
@@ -45,9 +50,6 @@ public abstract class AbstractContainerAllocator implements Runnable {
   protected final int containerMaxMemoryMb;
   protected final int containerMaxCpuCore;
 
-  @Override
-  public abstract void run();
-
   // containerRequestState indicate the state of all unfulfilled container requests and allocated containers
   protected final ContainerRequestState containerRequestState;
 
@@ -66,6 +68,31 @@ public abstract class AbstractContainerAllocator implements Runnable {
     this.containerMaxCpuCore = yarnConfig.getContainerMaxCpuCores();
   }
 
+  /**
+   * Continuously assigns requested containers to the allocated containers provided by the cluster manager.
+   * The loop frequency is governed by thread sleeps for ALLOCATOR_SLEEP_TIME ms.
+   *
+   * Terminates when the isRunning flag is cleared.
+   */
+  @Override
+  public void run() {
+    while(isRunning.get()) {
+      try {
+        assignContainerRequests();
+        Thread.sleep(ALLOCATOR_SLEEP_TIME);
+      } catch (InterruptedException e) {
+        log.info("Got InterruptedException in AllocatorThread.", e);
+      } catch (Exception e) {
+        log.error("Got unknown Exception in AllocatorThread.", e);
+      }
+    }
+  }
+
+  /**
+   * Assigns the container requests from the queue to the allocated containers from the cluster manager and
+   * runs them.
+   */
+  protected abstract void assignContainerRequests();
 
   /**
    * Called during initial request for containers
index 7c57a86..31fcc57 100644 (file)
  */
 package org.apache.samza.job.yarn;
 
+import java.util.List;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.samza.config.YarnConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.util.List;
 
 /**
  * This is the default allocator thread that will be used by SamzaTaskManager.
@@ -42,36 +42,28 @@ public class ContainerAllocator extends AbstractContainerAllocator {
   }
 
   /**
-   * During the run() method, the thread sleeps for ALLOCATOR_SLEEP_TIME ms. It tries to allocate any unsatisfied
-   * request that is still in the request queue (See requests in {@link org.apache.samza.job.yarn.ContainerRequestState})
+   * This method tries to allocate any unsatisfied request that is still in the request queue
+   * (See requests in {@link org.apache.samza.job.yarn.ContainerRequestState})
    * with allocated containers, if any.
    *
    * Since host-affinity is not enabled, all allocated container resources are buffered in the list keyed by "ANY_HOST".
    * */
   @Override
-  public void run() {
-    while(isRunning.get()) {
-      try {
-        List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(ANY_HOST);
-        while (!containerRequestState.getRequestsQueue().isEmpty() && allocatedContainers != null && allocatedContainers.size() > 0) {
-          SamzaContainerRequest request = containerRequestState.getRequestsQueue().peek();
-          Container container = allocatedContainers.get(0);
-
-          // Update state
-          containerRequestState.updateStateAfterAssignment(request, ANY_HOST, container);
+  public void assignContainerRequests() {
+    List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(ANY_HOST);
+    while (!containerRequestState.getRequestsQueue().isEmpty() && allocatedContainers != null && allocatedContainers.size() > 0) {
+      SamzaContainerRequest request = containerRequestState.getRequestsQueue().peek();
+      Container container = allocatedContainers.get(0);
 
-          // Cancel request and run container
-          log.info("Running {} on {}", request.expectedContainerId, container.getId());
-          containerUtil.runContainer(request.expectedContainerId, container);
-        }
+      // Update state
+      containerRequestState.updateStateAfterAssignment(request, ANY_HOST, container);
 
-        // If requestQueue is empty, all extra containers in the buffer should be released.
-        containerRequestState.releaseExtraContainers();
-
-        Thread.sleep(ALLOCATOR_SLEEP_TIME);
-      } catch (InterruptedException e) {
-        log.info("Got InterruptedException in AllocatorThread. Pending Container request(s) cannot be fulfilled!!", e);
-      }
+      // Cancel request and run container
+      log.info("Running {} on {}", request.expectedContainerId, container.getId());
+      containerUtil.runContainer(request.expectedContainerId, container);
     }
+
+    // If requestQueue is empty, all extra containers in the buffer should be released.
+    containerRequestState.releaseExtraContainers();
   }
 }
index ab3061e..54db5e5 100644 (file)
  */
 package org.apache.samza.job.yarn;
 
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class maintains the state variables for all the container requests and the allocated containers returned
@@ -205,35 +204,35 @@ public class ContainerRequestState {
   public synchronized int releaseExtraContainers() {
     int numReleasedContainers = 0;
 
-    if (hostAffinityEnabled) {
-      if (requestsQueue.isEmpty()) {
-        log.debug("Container Requests Queue is empty.");
+    if (requestsQueue.isEmpty()) {
+      log.debug("Container Requests Queue is empty.");
 
+      if (hostAffinityEnabled) {
         List<String> allocatedHosts = getAllocatedHosts();
         for (String host : allocatedHosts) {
-          List<Container> containers = getContainersOnAHost(host);
-          if (containers != null) {
-            for (Container c : containers) {
-              log.info("Releasing extra container {} allocated on {}", c.getId(), host);
-              amClient.releaseAssignedContainer(c.getId());
-              numReleasedContainers++;
-            }
-          }
+          numReleasedContainers += releaseContainersForHost(host);
         }
-        clearState();
+      } else {
+        numReleasedContainers += releaseContainersForHost(ANY_HOST);
       }
-    } else {
-      if (requestsQueue.isEmpty()) {
-        log.debug("No more pending requests in Container Requests Queue.");
+      clearState();
+    }
+    return numReleasedContainers;
+  }
 
-        List<Container> availableContainers = getContainersOnAHost(ANY_HOST);
-        while(availableContainers != null && !availableContainers.isEmpty()) {
-          Container c = availableContainers.remove(0);
-          log.info("Releasing extra allocated container - {}", c.getId());
-          amClient.releaseAssignedContainer(c.getId());
-          numReleasedContainers++;
-        }
-        clearState();
+  /**
+   * Releases all allocated containers for the specified host.
+   * @param host  the host for which the containers should be released.
+   * @return      the number of containers released.
+   */
+  private int releaseContainersForHost(String host) {
+    int numReleasedContainers = 0;
+    List<Container> containers = getContainersOnAHost(host);
+    if (containers != null) {
+      for (Container c : containers) {
+        log.info("Releasing extra container {} allocated on {}", c.getId(), host);
+        amClient.releaseAssignedContainer(c.getId());
+        numReleasedContainers++;
       }
     }
     return numReleasedContainers;
@@ -242,6 +241,7 @@ public class ContainerRequestState {
   /**
    * Clears all the state variables
    * Performed when there are no more unfulfilled requests
+   * This is not synchronized because it is private.
    */
   private void clearState() {
     allocatedContainers.clear();
index ff22dbf..8e1db77 100644 (file)
  */
 package org.apache.samza.job.yarn;
 
+import java.util.List;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.samza.config.YarnConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.util.List;
 
 /**
  * This is the allocator thread that will be used by SamzaTaskManager when host-affinity is enabled for a job. It is similar to {@link org.apache.samza.job.yarn.ContainerAllocator}, except that it considers container locality for allocation.
@@ -55,66 +55,49 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
    * allocatedContainers buffer keyed by "ANY_HOST".
    */
   @Override
-  public void run() {
-    try {
-      while (isRunning.get()) {
-        while (!containerRequestState.getRequestsQueue().isEmpty()) {
-          SamzaContainerRequest request = containerRequestState.getRequestsQueue().peek();
-          String preferredHost = request.getPreferredHost();
-          int expectedContainerId = request.getExpectedContainerId();
+  public void assignContainerRequests() {
+    while (!containerRequestState.getRequestsQueue().isEmpty()) {
+      SamzaContainerRequest request = containerRequestState.getRequestsQueue().peek();
+      String preferredHost = request.getPreferredHost();
+      int expectedContainerId = request.getExpectedContainerId();
 
-          log.info(
-              "Handling request for container id {} on preferred host {}",
-              expectedContainerId,
-              preferredHost);
+      log.info("Handling request for container id {} on preferred host {}", expectedContainerId, preferredHost);
 
-          List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(preferredHost);
-          if (allocatedContainers != null && allocatedContainers.size() > 0) {
-            // Found allocated container at preferredHost
-            Container container = allocatedContainers.get(0);
+      List<Container> allocatedContainers = containerRequestState.getContainersOnAHost(preferredHost);
+      if (allocatedContainers != null && allocatedContainers.size() > 0) {
+        // Found allocated container at preferredHost
+        Container container = allocatedContainers.get(0);
 
-            containerRequestState.updateStateAfterAssignment(request, preferredHost, container);
+        containerRequestState.updateStateAfterAssignment(request, preferredHost, container);
 
+        log.info("Running {} on {}", expectedContainerId, container.getId());
+        containerUtil.runMatchedContainer(expectedContainerId, container);
+      } else {
+        // No allocated container on preferredHost
+        log.info("Did not find any allocated containers on preferred host {} for running container id {}",
+            preferredHost, expectedContainerId);
+        boolean expired = requestExpired(request);
+        allocatedContainers = containerRequestState.getContainersOnAHost(ANY_HOST);
+        if (!expired || allocatedContainers == null || allocatedContainers.size() == 0) {
+          log.info("Either the request timestamp {} is greater than container request timeout {}ms or we couldn't "
+                  + "find any free allocated containers in the buffer. Breaking out of loop.",
+              request.getRequestTimestamp(), CONTAINER_REQUEST_TIMEOUT);
+          break;
+        } else {
+          if (allocatedContainers.size() > 0) {
+            Container container = allocatedContainers.get(0);
+            log.info("Found available containers on ANY_HOST. Assigning request for container_id {} with "
+                    + "timestamp {} to container {}",
+                new Object[]{String.valueOf(expectedContainerId), request.getRequestTimestamp(), container.getId()});
+            containerRequestState.updateStateAfterAssignment(request, ANY_HOST, container);
             log.info("Running {} on {}", expectedContainerId, container.getId());
-            containerUtil.runMatchedContainer(expectedContainerId, container);
-          } else {
-            // No allocated container on preferredHost
-            log.info(
-                "Did not find any allocated containers on preferred host {} for running container id {}",
-                preferredHost,
-                expectedContainerId);
-            boolean expired = requestExpired(request);
-            allocatedContainers = containerRequestState.getContainersOnAHost(ANY_HOST);
-            if (!expired || allocatedContainers == null || allocatedContainers.size() == 0) {
-              log.info("Either the request timestamp {} is greater than container request timeout {}ms or we couldn't " +
-                      "find any free allocated containers in the buffer. Breaking out of loop.",
-                  request.getRequestTimestamp(),
-                  CONTAINER_REQUEST_TIMEOUT);
-              break;
-            } else {
-              if (allocatedContainers.size() > 0) {
-                Container container = allocatedContainers.get(0);
-                log.info("Found available containers on ANY_HOST. Assigning request for container_id {} with " +
-                        "timestamp {} to container {}",
-                    new Object[] { String.valueOf(expectedContainerId), request.getRequestTimestamp(), container.getId()
-                });
-                containerRequestState.updateStateAfterAssignment(request, ANY_HOST, container);
-                log.info("Running {} on {}", expectedContainerId, container.getId());
-                containerUtil.runContainer(expectedContainerId, container);
-              }
-            }
+            containerUtil.runContainer(expectedContainerId, container);
           }
         }
-        // Release extra containers and update the entire system's state
-        containerRequestState.releaseExtraContainers();
-
-        Thread.sleep(ALLOCATOR_SLEEP_TIME);
       }
-    } catch (InterruptedException ie) {
-      log.info("Got an InterruptedException in HostAwareContainerAllocator thread!", ie);
-    } catch (Exception e) {
-      log.info("Got an unknown Exception in HostAwareContainerAllocator thread!", e);
     }
+    // Release extra containers and update the entire system's state
+    containerRequestState.releaseExtraContainers();
   }
 
   private boolean requestExpired(SamzaContainerRequest request) {