SAMZA-1747: Add metric to measure effectiveness of host-affinity
authorJagadish <jvenkatraman@linkedin.com>
Thu, 14 Jun 2018 21:04:53 +0000 (14:04 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Thu, 14 Jun 2018 21:04:53 +0000 (14:04 -0700)
We require visibility into how effectively host-affinity performs. The goal is to help easily answer the following questions.
- How effectively is YARN matching my preferred-host requests
- When does Samza fallback to abandoning locality and issuing any-host requests?

design doc: https://docs.google.com/document/d/1oeNKDnG4JIGT2846us-jpnGW_RUjMjPIKDeEUlE_-jg/edit#

Author: Jagadish <jvenkatraman@linkedin.com>

Reviewers: Prateek M<pmaheshw@linkedin.com>

Closes #553 from vjagadish1989/hostaffinity-metrics

samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java

index 521a43b..9f1afed 100644 (file)
@@ -207,6 +207,11 @@ public abstract class AbstractContainerAllocator implements Runnable {
         preferredHost, containerID);
     resourceRequestState.addResourceRequest(request);
     state.containerRequests.incrementAndGet();
+    if (ResourceRequestState.ANY_HOST.equals(preferredHost)) {
+      state.anyHostRequests.incrementAndGet();
+    } else {
+      state.preferredHostRequests.incrementAndGet();
+    }
   }
 
   /**
index fe462e7..d59a893 100644 (file)
@@ -79,6 +79,7 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
         boolean resourceAvailableOnAnyHost = hasAllocatedResource(ResourceRequestState.ANY_HOST);
 
         if (expired) {
+          updateExpiryMetrics(request);
           if (resourceAvailableOnAnyHost) {
             log.info("Request for container: {} on {} has expired. Running on ANY_HOST", request.getContainerID(), request.getPreferredHost());
             runStreamProcessor(request, ResourceRequestState.ANY_HOST);
@@ -109,4 +110,13 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
     }
     return requestExpired;
   }
+
+  private void updateExpiryMetrics(SamzaResourceRequest request) {
+    String preferredHost = request.getPreferredHost();
+    if (ResourceRequestState.ANY_HOST.equals(preferredHost)) {
+      state.expiredAnyHostRequests.incrementAndGet();
+    } else {
+      state.expiredPreferredHostRequests.incrementAndGet();
+    }
+  }
 }
index 0dcaace..4e6fc33 100644 (file)
@@ -125,6 +125,14 @@ public class SamzaApplicationState {
 
   public final AtomicInteger matchedResourceRequests = new AtomicInteger(0);
 
+  public final AtomicInteger preferredHostRequests = new AtomicInteger(0);
+
+  public final AtomicInteger anyHostRequests = new AtomicInteger(0);
+
+  public final AtomicInteger expiredPreferredHostRequests = new AtomicInteger(0);
+
+  public final AtomicInteger expiredAnyHostRequests = new AtomicInteger(0);
+
   /**
    * Number of invalid container notifications.
    *
index c396ed6..15cb18f 100644 (file)
@@ -53,16 +53,20 @@ class ContainerProcessManagerMetrics(
     val mReleasedContainers = newGauge("released-containers", () => state.releasedContainers.get())
     val mContainers = newGauge("container-count", () => state.containerCount)
     val mRedundantNotifications = newGauge("redundant-notifications", () => state.redundantNotifications.get())
-
     val mJobHealthy = newGauge("job-healthy", () => if (state.jobHealthy.get()) 1 else 0)
-    val mLocalityMatchedRequests = newGauge(
-      "locality-matched",
-      () => {
-        if (state.containerRequests.get() != 0) {
-          state.matchedResourceRequests.get() / state.containerRequests.get()
-        } else {
-          0L
-        }
+    val mPreferredHostRequests = newGauge("preferred-host-requests", () => state.preferredHostRequests.get())
+    val mAnyHostRequests = newGauge("any-host-requests", () => state.anyHostRequests.get())
+    val mExpiredPreferredHostRequests = newGauge("expired-preferred-host-requests", () => state.expiredPreferredHostRequests.get())
+    val mExpiredAnyHostRequests = newGauge("expired-any-host-requests", () => state.expiredAnyHostRequests.get())
+
+    val mHostAffinityMatchPct = newGauge("host-affinity-match-pct", () => {
+      val numPreferredHostRequests = state.preferredHostRequests.get()
+      val numExpiredPreferredHostRequests = state.expiredPreferredHostRequests.get()
+      if (numPreferredHostRequests != 0) {
+            100.00 * (numPreferredHostRequests - numExpiredPreferredHostRequests) / numPreferredHostRequests
+          } else {
+            0L
+          }
       })
 
     jvm.start
index 490de51..b9fc2e5 100644 (file)
@@ -394,9 +394,17 @@ public class TestHostAwareContainerAllocator {
       Assert.fail("Timed out waiting for container-0 to launch");
     }
     // verify that the second preferred host request should expire and should trigger ANY_HOST requests
+    // wait for 4 requests to be made (2 preferred-host requests - one each on host-1 & host-2;  2 any-host requests)
     if (!clusterResourceManager.awaitResourceRequests(4, 20, TimeUnit.SECONDS)) {
       Assert.fail("Timed out waiting for resource requests");
     }
+    // verify 2 preferred host requests should have been made for host-1 and host-2
+    Assert.assertEquals(2, state.preferredHostRequests.get());
+    // verify both of them should have expired.
+    Assert.assertEquals(2, state.expiredPreferredHostRequests.get());
+    // verify there were at-least 2 any-host requests
+    Assert.assertTrue(state.anyHostRequests.get() >= 2);
+    Assert.assertTrue(state.expiredAnyHostRequests.get() <= state.anyHostRequests.get());
     // finally, provide a container from YARN after multiple requests
     containerAllocator.addResource(resource1);
     // verify all the test assertions