SAMZA-1347: GroupByContainerIds NPE if containerIds list is null
authorJacob Maes <jmaes@linkedin.com>
Tue, 27 Jun 2017 18:42:51 +0000 (11:42 -0700)
committerJacob Maes <jmaes@linkedin.com>
Tue, 27 Jun 2017 18:42:51 +0000 (11:42 -0700)
Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Boris Shkolnik <boryas@apache.org>

Closes #233 from jmakes/samza-1347

samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java

index f2d88cd..651dca7 100644 (file)
@@ -46,12 +46,6 @@ public class GroupByContainerIds implements TaskNameGrouper {
 
   @Override
   public Set<ContainerModel> group(Set<TaskModel> tasks) {
-    if (tasks.isEmpty())
-      throw new IllegalArgumentException("cannot group an empty set");
-
-    if (startContainerCount > tasks.size())
-      throw new IllegalArgumentException("number of containers="  + startContainerCount + " is bigger than number of tasks=" + tasks.size());
-
     List<String> containerIds = new ArrayList<>(startContainerCount);
     for (int i = 0; i < startContainerCount; i++) {
       containerIds.add(String.valueOf(i));
@@ -60,6 +54,12 @@ public class GroupByContainerIds implements TaskNameGrouper {
   }
 
   public Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
+    if (containersIds == null)
+      return this.group(tasks);
+
+    if (containersIds.isEmpty())
+      throw new IllegalArgumentException("Must have at least one container");
+
     if (tasks.isEmpty())
       throw new IllegalArgumentException("cannot group an empty set. containersIds=" + Arrays
           .toString(containersIds.toArray()));
@@ -67,9 +67,6 @@ public class GroupByContainerIds implements TaskNameGrouper {
     if (containersIds.size() > tasks.size())
       throw new IllegalArgumentException("number of containers "  + containersIds.size() + " is bigger than number of tasks " + tasks.size());
 
-    if (containersIds == null)
-      return this.group(tasks);
-
     int containerCount = containersIds.size();
 
     // Sort tasks by taskName.
index 62131fe..cd2cc3d 100644 (file)
@@ -20,6 +20,7 @@
 package org.apache.samza.container.grouper.task;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -114,6 +115,40 @@ public class TestGroupByContainerIds {
   }
 
   @Test
+  public void testGroupWithNullContainerIds() {
+    Set<TaskModel> taskModels = generateTaskModels(5);
+
+    Set<ContainerModel> containers = buildSimpleGrouper(2).group(taskModels, null);
+
+    Map<String, ContainerModel> containersMap = new HashMap<>();
+    for (ContainerModel container : containers) {
+      containersMap.put(container.getProcessorId(), container);
+    }
+
+    assertEquals(2, containers.size());
+    ContainerModel container0 = containersMap.get("0");
+    ContainerModel container1 = containersMap.get("1");
+    assertNotNull(container0);
+    assertNotNull(container1);
+    assertEquals("0", container0.getProcessorId());
+    assertEquals("1", container1.getProcessorId());
+    assertEquals(3, container0.getTasks().size());
+    assertEquals(2, container1.getTasks().size());
+    assertTrue(container0.getTasks().containsKey(getTaskName(0)));
+    assertTrue(container0.getTasks().containsKey(getTaskName(2)));
+    assertTrue(container0.getTasks().containsKey(getTaskName(4)));
+    assertTrue(container1.getTasks().containsKey(getTaskName(1)));
+    assertTrue(container1.getTasks().containsKey(getTaskName(3)));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGroupWithEmptyContainerIds() {
+    Set<TaskModel> taskModels = generateTaskModels(5);
+
+    buildSimpleGrouper(2).group(taskModels, Collections.emptyList());
+  }
+
+  @Test
   public void testGroupHappyPathWithListOfContainers() {
     Set<TaskModel> taskModels = generateTaskModels(5);