SAMZA-1699: Fix NPE in ClusterResourceManager
authorJagadish <jvenkatraman@linkedin.com>
Fri, 4 May 2018 21:02:35 +0000 (14:02 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Fri, 4 May 2018 21:02:35 +0000 (14:02 -0700)
When the ClusterResourcedManager receives a notification that a container is started, it moves the container from the "pending queue" to its "running queue".
In the meanwhile, it's possible for another thread to remove the mapping for the key. Here's an example:

NMCallbackThread-1:```
pendingYarnContainers.remove(key);```

NMCallbackThread-2:
```
for (String key : pendingYarnContainers.keySet()) {
  yarnContainer = pendingYarnContainers.get(key); <-- could be null depending on whether the removal happened before it.
}```

Author: Jagadish <jvenkatraman@linkedin.com>

Reviewers: Prateek M<pmaheshw@linkedin.com>

Closes #504 from vjagadish/npe-fix-async

samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java

index 407768c..79a9083 100644 (file)
@@ -712,7 +712,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
   private String getPendingSamzaContainerId(ContainerId containerId) {
     for (String samzaContainerId: state.pendingYarnContainers.keySet()) {
       YarnContainer yarnContainer = state.pendingYarnContainers.get(samzaContainerId);
-      if (yarnContainer.id().equals(containerId)) {
+      if (yarnContainer != null && yarnContainer.id().equals(containerId)) {
         return samzaContainerId;
       }
     }