Rebase with the recent master before merge. 874/head
authorShanthoosh Venkataraman <spvenkat@usc.edu>
Tue, 12 Feb 2019 16:23:04 +0000 (08:23 -0800)
committerShanthoosh Venkataraman <spvenkat@usc.edu>
Tue, 12 Feb 2019 16:23:04 +0000 (08:23 -0800)
1. Fix the TestZkLocalApplicationRunner test factoring in the active task
into account.
2. Improve logging in SSPGrouperProxy.

samza-core/src/main/java/org/apache/samza/container/grouper/stream/SSPGrouperProxy.java
samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java

index 1ef7269..491d1d6 100644 (file)
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.HashSet;
 import com.google.common.base.Preconditions;
@@ -99,18 +100,22 @@ public class SSPGrouperProxy {
           Integer previousStreamPartitionCount = previousStreamToPartitionCount.getOrDefault(systemStream, 0);
           Integer currentStreamPartitionCount = currentStreamToPartitionCount.getOrDefault(systemStream, 0);
 
+          TaskName previouslyAssignedTask = null;
           if (previousStreamPartitionCount > 0 && !currentStreamPartitionCount.equals(previousStreamPartitionCount)) {
-            LOGGER.info("Partition count of system stream: {} had changed from: {} to: {} partitions. Performing partition reassignment.", systemStream, previousStreamPartitionCount, currentStreamPartitionCount);
+            LOGGER.info("Partition count of system stream: {} had changed from: {} to: {} partitions.", systemStream, previousStreamPartitionCount, currentStreamPartitionCount);
 
             SystemStreamPartition previousSystemStreamPartition = systemStreamPartitionMapper.getPreviousSSP(currentSystemStreamPartition, previousStreamPartitionCount, currentStreamPartitionCount);
-            TaskName previouslyAssignedTask = previousSSPToTask.get(previousSystemStreamPartition);
+            previouslyAssignedTask = previousSSPToTask.get(previousSystemStreamPartition);
+          } else if (previousSSPToTask.containsKey(currentSystemStreamPartition)) {
+            // If a previous mapping for a task to system stream partition exists, then move the SSP to it.
+            previouslyAssignedTask = previousSSPToTask.get(currentSystemStreamPartition);
+          }
 
+          if (previouslyAssignedTask != null && !Objects.equals(previouslyAssignedTask, currentlyAssignedTask)) {
             LOGGER.info("Moving systemStreamPartition: {} from task: {} to task: {}.", currentSystemStreamPartition, currentlyAssignedTask, previouslyAssignedTask);
 
             taskToPartitionGroup.get(currentlyAssignedTask).removeSSP(currentSystemStreamPartition);
             taskToPartitionGroup.get(previouslyAssignedTask).addSSP(currentSystemStreamPartition);
-          } else {
-            LOGGER.debug("No partition change in SystemStream: {}. Skipping partition reassignment.", systemStream);
           }
         }
       }
index 1f5b24e..8249bcc 100644 (file)
@@ -60,6 +60,7 @@ import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metadatastore.MetadataStoreFactory;
@@ -834,6 +835,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     }
 
     Assert.assertEquals(expectedTaskAssignments, actualTaskAssignments);
+    Assert.assertEquals(32, jobModel.maxChangeLogStreamPartitions);
   }
 
   /**
@@ -935,6 +937,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     // Validate that the new JobModel has the expected task assignments.
     actualTaskAssignments = getTaskAssignments(jobModel);
     Assert.assertEquals(expectedTaskAssignments, actualTaskAssignments);
+    Assert.assertEquals(32, jobModel.maxChangeLogStreamPartitions);
   }
 
   /**
@@ -950,7 +953,9 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
         if (!taskAssignments.containsKey(taskModel.getTaskName())) {
           taskAssignments.put(taskModel.getTaskName(), new HashSet<>());
         }
-        taskAssignments.get(taskModel.getTaskName()).addAll(taskModel.getSystemStreamPartitions());
+        if (taskModel.getTaskMode() == TaskMode.Active) {
+          taskAssignments.get(taskModel.getTaskName()).addAll(taskModel.getSystemStreamPartitions());
+        }
       }
     }
     return taskAssignments;