SAMZA-1172: Fix for the topological sort to handle single-node loop
authorXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Wed, 29 Mar 2017 17:49:25 +0000 (10:49 -0700)
committerXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Wed, 29 Mar 2017 17:49:25 +0000 (10:49 -0700)
In the processor graph, the topological sort missed adding to the visited set during graph traversal. This caused wrong graph being generated for single-node loop. This is fixed in the patch.

Also fixed the maxPartition method not handling empty collection correctly.

Added a few new unit tests for these. Also adjust the timing of previous async commit unit tests so it can run more reliably. Long term wise we need to fix the timer inside the AsyncRunLoop tests.

Author: Xinyu Liu <xiliu@xiliu-ld.linkedin.biz>

Reviewers: Jacob Maes <jmakes@apache.org>

Closes #100 from xinyuiscool/SAMZA-1172

samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java
samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java
samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java

index 77790a8..ca2e71e 100644 (file)
@@ -302,8 +302,8 @@ public class ExecutionPlanner {
     }
   }
 
-  private static int maxPartition(Collection<StreamEdge> edges) {
-    return edges.stream().map(StreamEdge::getPartitionCount).reduce(Integer::max).get();
+  /* package private */ static int maxPartition(Collection<StreamEdge> edges) {
+    return edges.stream().map(StreamEdge::getPartitionCount).reduce(Integer::max).orElse(StreamEdge.PARTITIONS_UNKNOWN);
   }
 
   private static StreamSpec createStreamSpec(StreamEdge edge) {
index d94a9eb..13755ae 100644 (file)
@@ -276,6 +276,10 @@ public class ProcessorGraph {
    */
   /* package private */ List<ProcessorNode> topologicalSort() {
     Collection<ProcessorNode> pnodes = nodes.values();
+    if (pnodes.size() == 1) {
+      return new ArrayList<>(pnodes);
+    }
+
     Queue<ProcessorNode> q = new ArrayDeque<>();
     Map<String, Long> indegree = new HashMap<>();
     Set<ProcessorNode> visited = new HashSet<>();
@@ -337,6 +341,7 @@ public class ProcessorGraph {
           }
           // start from the node with minimal input edge again
           q.add(minNode);
+          visited.add(minNode);
         } else {
           // all the remaining nodes should be reachable from sources
           // start from sources again to find the next node that hasn't been visited
@@ -344,6 +349,7 @@ public class ProcessorGraph {
               .filter(node -> !visited.contains(node))
               .findAny().get();
           q.add(nextNode);
+          visited.add(nextNode);
         }
       }
     }
index fa02e04..b69eec6 100644 (file)
@@ -20,6 +20,9 @@
 package org.apache.samza.execution;
 
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -44,6 +47,7 @@ import org.apache.samza.task.TaskCoordinator;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 
@@ -279,4 +283,23 @@ public class TestExecutionPlanner {
         assertTrue(edge.getPartitionCount() == 64); // max of input1 and output1
       });
   }
+
+  @Test
+  public void testMaxPartition() {
+    Collection<StreamEdge> edges = new ArrayList<>();
+    StreamEdge edge = new StreamEdge(input1);
+    edge.setPartitionCount(2);
+    edges.add(edge);
+    edge = new StreamEdge(input2);
+    edge.setPartitionCount(32);
+    edges.add(edge);
+    edge = new StreamEdge(input3);
+    edge.setPartitionCount(16);
+    edges.add(edge);
+
+    assertEquals(ExecutionPlanner.maxPartition(edges), 32);
+
+    edges = Collections.emptyList();
+    assertEquals(ExecutionPlanner.maxPartition(edges), StreamEdge.PARTITIONS_UNKNOWN);
+  }
 }
index 2bdf529..2f89d91 100644 (file)
@@ -27,6 +27,7 @@ import org.apache.samza.system.StreamSpec;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 
@@ -34,6 +35,8 @@ public class TestProcessorGraph {
 
   ProcessorGraph graph1;
   ProcessorGraph graph2;
+  ProcessorGraph graph3;
+  ProcessorGraph graph4;
   int streamSeq = 0;
 
   private StreamSpec genStream() {
@@ -88,6 +91,24 @@ public class TestProcessorGraph {
     graph2.addIntermediateStream(genStream(), "5", "5");
     graph2.addIntermediateStream(genStream(), "5", "7");
     graph2.addSink(genStream(), "7");
+
+    /**
+     * graph3 is a graph with self loops
+     * 1<->1 -> 2<->2
+     */
+    graph3 = new ProcessorGraph(null);
+    graph3.addSource(genStream(), "1");
+    graph3.addIntermediateStream(genStream(), "1", "1");
+    graph3.addIntermediateStream(genStream(), "1", "2");
+    graph3.addIntermediateStream(genStream(), "2", "2");
+
+    /**
+     * graph4 is a graph of single-loop node
+     * 1<->1
+     */
+    graph4 = new ProcessorGraph(null);
+    graph4.addSource(genStream(), "1");
+    graph4.addIntermediateStream(genStream(), "1", "1");
   }
 
   @Test
@@ -194,5 +215,16 @@ public class TestProcessorGraph {
     assertTrue(idxMap2.get("6") > idxMap2.get("1"));
     assertTrue(idxMap2.get("5") > idxMap2.get("4"));
     assertTrue(idxMap2.get("7") > idxMap2.get("5"));
+
+    //test graph3
+    List<ProcessorNode> sortedNodes3 = graph3.topologicalSort();
+    assertTrue(sortedNodes3.size() == 2);
+    assertEquals(sortedNodes3.get(0).getId(), "1");
+    assertEquals(sortedNodes3.get(1).getId(), "2");
+
+    //test graph4
+    List<ProcessorNode> sortedNodes4 = graph4.topologicalSort();
+    assertTrue(sortedNodes4.size() == 1);
+    assertEquals(sortedNodes4.get(0).getId(), "1");
   }
 }
index 31cbe79..60dcd26 100644 (file)
@@ -47,7 +47,6 @@ import org.apache.samza.system.SystemConsumers;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.TestSystemConsumers;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import scala.Option;
 import scala.collection.JavaConversions;
@@ -575,7 +574,7 @@ public class TestAsyncRunLoop {
       });
 
     runLoop.run();
-    callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
+    callbackExecutor.awaitTermination(500, TimeUnit.MILLISECONDS);
 
     verify(offsetManager, atLeastOnce()).checkpoint(taskName0);
     assertEquals(3, task0.processed);
@@ -585,7 +584,6 @@ public class TestAsyncRunLoop {
   }
 
   @Test
-  @Ignore
   public void testProcessBehaviourWhenAsyncCommitIsEnabled() throws InterruptedException {
     TestTask task0 = new TestTask(true, true, false);
 
@@ -631,6 +629,6 @@ public class TestAsyncRunLoop {
 
     runLoop.run();
 
-    callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
+    callbackExecutor.awaitTermination(500, TimeUnit.MILLISECONDS);
   }
 }