Samza 1186: Rename Processor to Job
authorXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Tue, 4 Apr 2017 01:24:33 +0000 (18:24 -0700)
committerXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Tue, 4 Apr 2017 01:24:33 +0000 (18:24 -0700)
Now we have the top level Samza application, and each stage is called a job, the previous introduced "processor" naming should be renamed as Job. This includes renaming PrcessorGraph to JobGraph, and ProcessorNode to JobNode, etc.

Author: Xinyu Liu <xiliu@xiliu-ld.linkedin.biz>

Reviewers: Jake Maes <jmakes@apache.org>

Closes #109 from xinyuiscool/SAMZA-1186

samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
samza-core/src/main/java/org/apache/samza/execution/JobGraph.java [moved from samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java with 72% similarity]
samza-core/src/main/java/org/apache/samza/execution/JobNode.java [moved from samza-core/src/main/java/org/apache/samza/execution/ProcessorNode.java with 75% similarity]
samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java [deleted file]

index d4f5b00..e4e24b4 100644 (file)
@@ -98,6 +98,7 @@ public abstract class ApplicationRunner {
    * Returns {@link ApplicationStatus#Running} if any of the jobs are running.
    *
    * @param streamApp  the user-defined {@link StreamApplication} object
+   * @return the status of the application
    */
   public abstract ApplicationStatus status(StreamApplication streamApp);
 
index f9e44cf..47deecd 100644 (file)
@@ -57,26 +57,23 @@ public class ExecutionPlanner {
     this.streamManager = streamManager;
   }
 
-  public ProcessorGraph plan(StreamGraph streamGraph) throws Exception {
-    // create physical processors based on stream graph
-    ProcessorGraph processorGraph = createProcessorGraph(streamGraph);
+  public JobGraph plan(StreamGraph streamGraph) throws Exception {
+    // create physical job graph based on stream graph
+    JobGraph jobGraph = createJobGraph(streamGraph);
 
-    if (!processorGraph.getIntermediateStreams().isEmpty()) {
+    if (!jobGraph.getIntermediateStreams().isEmpty()) {
       // figure out the partitions for internal streams
-      calculatePartitions(streamGraph, processorGraph);
+      calculatePartitions(streamGraph, jobGraph);
     }
 
-    return processorGraph;
+    return jobGraph;
   }
 
   /**
    * Create the physical graph from StreamGraph
    */
-  /* package private */ ProcessorGraph createProcessorGraph(StreamGraph streamGraph) {
-    // For this phase, we are going to create a processor for the whole dag
-    String processorId = config.get(JobConfig.JOB_NAME()); // only one processor, use the job name
-
-    ProcessorGraph processorGraph = new ProcessorGraph(config);
+  /* package private */ JobGraph createJobGraph(StreamGraph streamGraph) {
+    JobGraph jobGraph = new JobGraph(streamGraph, config);
     Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInStreams().keySet());
     Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutStreams().keySet());
     Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
@@ -84,46 +81,51 @@ public class ExecutionPlanner {
     sourceStreams.removeAll(intStreams);
     sinkStreams.removeAll(intStreams);
 
+    // For this phase, we have a single job node for the whole dag
+    String jobName = config.get(JobConfig.JOB_NAME());
+    String jobId = config.get(JobConfig.JOB_ID(), "1");
+    JobNode node = jobGraph.getOrCreateNode(jobName, jobId);
+
     // add sources
-    sourceStreams.forEach(spec -> processorGraph.addSource(spec, processorId));
+    sourceStreams.forEach(spec -> jobGraph.addSource(spec, node));
 
     // add sinks
-    sinkStreams.forEach(spec -> processorGraph.addSink(spec, processorId));
+    sinkStreams.forEach(spec -> jobGraph.addSink(spec, node));
 
     // add intermediate streams
-    intStreams.forEach(spec -> processorGraph.addIntermediateStream(spec, processorId, processorId));
+    intStreams.forEach(spec -> jobGraph.addIntermediateStream(spec, node, node));
 
-    processorGraph.validate();
+    jobGraph.validate();
 
-    return processorGraph;
+    return jobGraph;
   }
 
   /**
    * Figure out the number of partitions of all streams
    */
-  /* package private */ void calculatePartitions(StreamGraph streamGraph, ProcessorGraph processorGraph) {
+  /* package private */ void calculatePartitions(StreamGraph streamGraph, JobGraph jobGraph) {
     // fetch the external streams partition info
-    updateExistingPartitions(processorGraph, streamManager);
+    updateExistingPartitions(jobGraph, streamManager);
 
     // calculate the partitions for the input streams of join operators
-    calculateJoinInputPartitions(streamGraph, processorGraph);
+    calculateJoinInputPartitions(streamGraph, jobGraph);
 
     // calculate the partitions for the rest of intermediate streams
-    calculateIntStreamPartitions(processorGraph, config);
+    calculateIntStreamPartitions(jobGraph, config);
 
     // validate all the partitions are assigned
-    validatePartitions(processorGraph);
+    validatePartitions(jobGraph);
   }
 
   /**
    * Fetch the partitions of source/sink streams and update the StreamEdges.
-   * @param processorGraph ProcessorGraph
+   * @param jobGraph {@link JobGraph}
    * @param streamManager the {@StreamManager} to interface with the streams.
    */
-  /* package private */ static void updateExistingPartitions(ProcessorGraph processorGraph, StreamManager streamManager) {
+  /* package private */ static void updateExistingPartitions(JobGraph jobGraph, StreamManager streamManager) {
     Set<StreamEdge> existingStreams = new HashSet<>();
-    existingStreams.addAll(processorGraph.getSources());
-    existingStreams.addAll(processorGraph.getSinks());
+    existingStreams.addAll(jobGraph.getSources());
+    existingStreams.addAll(jobGraph.getSinks());
 
     Multimap<String, StreamEdge> systemToStreamEdges = HashMultimap.create();
     // group the StreamEdge(s) based on the system name
@@ -150,7 +152,7 @@ public class ExecutionPlanner {
   /**
    * Calculate the partitions for the input streams of join operators
    */
-  /* package private */ static void calculateJoinInputPartitions(StreamGraph streamGraph, ProcessorGraph processorGraph) {
+  /* package private */ static void calculateJoinInputPartitions(StreamGraph streamGraph, JobGraph jobGraph) {
     // mapping from a source stream to all join specs reachable from it
     Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges = HashMultimap.create();
     // reverse mapping of the above
@@ -165,7 +167,7 @@ public class ExecutionPlanner {
     Set<OperatorSpec> visited = new HashSet<>();
 
     streamGraph.getInStreams().entrySet().forEach(entry -> {
-        StreamEdge streamEdge = processorGraph.getOrCreateEdge(entry.getKey());
+        StreamEdge streamEdge = jobGraph.getOrCreateEdge(entry.getKey());
         // Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
         findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs,
             outputStreamToJoinSpec, joinQ, visited);
@@ -248,24 +250,24 @@ public class ExecutionPlanner {
     }
   }
 
-  private static void calculateIntStreamPartitions(ProcessorGraph processorGraph, Config config) {
+  private static void calculateIntStreamPartitions(JobGraph jobGraph, Config config) {
     int partitions = config.getInt(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), StreamEdge.PARTITIONS_UNKNOWN);
     if (partitions < 0) {
       // use the following simple algo to figure out the partitions
       // partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions))
-      int maxInPartitions = maxPartition(processorGraph.getSources());
-      int maxOutPartitions = maxPartition(processorGraph.getSinks());
+      int maxInPartitions = maxPartition(jobGraph.getSources());
+      int maxOutPartitions = maxPartition(jobGraph.getSinks());
       partitions = Math.max(maxInPartitions, maxOutPartitions);
     }
-    for (StreamEdge edge : processorGraph.getIntermediateStreams()) {
+    for (StreamEdge edge : jobGraph.getIntermediateStreams()) {
       if (edge.getPartitionCount() <= 0) {
         edge.setPartitionCount(partitions);
       }
     }
   }
 
-  private static void validatePartitions(ProcessorGraph processorGraph) {
-    for (StreamEdge edge : processorGraph.getIntermediateStreams()) {
+  private static void validatePartitions(JobGraph jobGraph) {
+    for (StreamEdge edge : jobGraph.getIntermediateStreams()) {
       if (edge.getPartitionCount() <= 0) {
         throw new SamzaException(String.format("Failure to assign the partitions to Stream %s", edge.getFormattedSystemStream()));
       }
@@ -31,44 +31,46 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.system.StreamSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * The ProcessorGraph is the physical execution graph for a multi-stage Samza application.
- * It contains the topology of execution processors connected with source/sink/intermediate streams.
- * High level APIs are transformed into ProcessorGraph for planning, validation and execution.
+ * The JobGraph is the physical execution graph for a multi-stage Samza application.
+ * It contains the topology of jobs connected with source/sink/intermediate streams.
+ * High level APIs are transformed into JobGraph for planning, validation and execution.
  * Source/sink streams are external streams while intermediate streams are created and managed by Samza.
- * Note that intermediate streams are both the input and output of a ProcessorNode in ProcessorGraph.
+ * Note that intermediate streams are both the input and output of a JobNode in JobGraph.
  * So the graph may have cycles and it's not a DAG.
  */
-public class ProcessorGraph {
-  private static final Logger log = LoggerFactory.getLogger(ProcessorGraph.class);
+public class JobGraph {
+  private static final Logger log = LoggerFactory.getLogger(JobGraph.class);
 
-  private final Map<String, ProcessorNode> nodes = new HashMap<>();
+  private final Map<String, JobNode> nodes = new HashMap<>();
   private final Map<String, StreamEdge> edges = new HashMap<>();
   private final Set<StreamEdge> sources = new HashSet<>();
   private final Set<StreamEdge> sinks = new HashSet<>();
   private final Set<StreamEdge> intermediateStreams = new HashSet<>();
   private final Config config;
+  private final StreamGraph streamGraph;
 
   /**
-   * The ProcessorGraph is only constructed by the {@link ExecutionPlanner}.
+   * The JobGraph is only constructed by the {@link ExecutionPlanner}.
    * @param config Config
    */
-  /* package private */ ProcessorGraph(Config config) {
+  /* package private */ JobGraph(StreamGraph streamGraph, Config config) {
+    this.streamGraph = streamGraph;
     this.config = config;
   }
 
   /**
-   * Add a source stream to a {@link ProcessorNode}
+   * Add a source stream to a {@link JobNode}
    * @param input source stream
-   * @param targetProcessorId id of the {@link ProcessorNode}
+   * @param node the job node that consumes from the source
    */
-  /* package private */ void addSource(StreamSpec input, String targetProcessorId) {
-    ProcessorNode node = getOrCreateProcessor(targetProcessorId);
+  /* package private */ void addSource(StreamSpec input, JobNode node) {
     StreamEdge edge = getOrCreateEdge(input);
     edge.addTargetNode(node);
     node.addInEdge(edge);
@@ -76,12 +78,11 @@ public class ProcessorGraph {
   }
 
   /**
-   * Add a sink stream to a {@link ProcessorNode}
+   * Add a sink stream to a {@link JobNode}
    * @param output sink stream
-   * @param sourceProcessorId id of the {@link ProcessorNode}
+   * @param node the job node that outputs to the sink
    */
-  /* package private */ void addSink(StreamSpec output, String sourceProcessorId) {
-    ProcessorNode node = getOrCreateProcessor(sourceProcessorId);
+  /* package private */ void addSink(StreamSpec output, JobNode node) {
     StreamEdge edge = getOrCreateEdge(output);
     edge.addSourceNode(node);
     node.addOutEdge(edge);
@@ -89,32 +90,32 @@ public class ProcessorGraph {
   }
 
   /**
-   * Add an intermediate stream from source to target {@link ProcessorNode}
+   * Add an intermediate stream from source to target {@link JobNode}
    * @param streamSpec intermediate stream
-   * @param sourceProcessorId id of the source {@link ProcessorNode}
-   * @param targetProcessorId id of the target {@link ProcessorNode}
+   * @param from the source node
+   * @param to the target node
    */
-  /* package private */ void addIntermediateStream(StreamSpec streamSpec, String sourceProcessorId, String targetProcessorId) {
-    ProcessorNode sourceNode = getOrCreateProcessor(sourceProcessorId);
-    ProcessorNode targetNode = getOrCreateProcessor(targetProcessorId);
+  /* package private */ void addIntermediateStream(StreamSpec streamSpec, JobNode from, JobNode to) {
     StreamEdge edge = getOrCreateEdge(streamSpec);
-    edge.addSourceNode(sourceNode);
-    edge.addTargetNode(targetNode);
-    sourceNode.addOutEdge(edge);
-    targetNode.addInEdge(edge);
+    edge.addSourceNode(from);
+    edge.addTargetNode(to);
+    from.addOutEdge(edge);
+    to.addInEdge(edge);
     intermediateStreams.add(edge);
   }
 
   /**
-   * Get the {@link ProcessorNode} for an id. Create one if it does not exist.
-   * @param processorId id of the processor
-   * @return processor node
+   * Get the {@link JobNode}. Create one if it does not exist.
+   * @param jobName name of the job
+   * @param jobId id of the job
+   * @return
    */
-  /* package private */ProcessorNode getOrCreateProcessor(String processorId) {
-    ProcessorNode node = nodes.get(processorId);
+  /* package private */JobNode getOrCreateNode(String jobName, String jobId) {
+    String nodeId = JobNode.createId(jobName, jobId);
+    JobNode node = nodes.get(nodeId);
     if (node == null) {
-      node = new ProcessorNode(processorId, config);
-      nodes.put(processorId, node);
+      node = new JobNode(jobName, jobId, config);
+      nodes.put(nodeId, node);
     }
     return node;
   }
@@ -135,11 +136,11 @@ public class ProcessorGraph {
   }
 
   /**
-   * Returns the processors to be executed in the topological order
-   * @return unmodifiable list of {@link ProcessorNode}
+   * Returns the job nodes to be executed in the topological order
+   * @return unmodifiable list of {@link JobNode}
    */
-  public List<ProcessorNode> getProcessorNodes() {
-    List<ProcessorNode> sortedNodes = topologicalSort();
+  public List<JobNode> getJobNodes() {
+    List<JobNode> sortedNodes = topologicalSort();
     return Collections.unmodifiableList(sortedNodes);
   }
 
@@ -167,6 +168,14 @@ public class ProcessorGraph {
     return Collections.unmodifiableSet(intermediateStreams);
   }
 
+  /**
+   * Return the {@link StreamGraph}
+   * @return {@link StreamGraph}
+   */
+  public StreamGraph getStreamGraph() {
+    return this.streamGraph;
+  }
+
 
   /**
    * Validate the graph has the correct topology, meaning the sources are coming from external streams,
@@ -233,31 +242,31 @@ public class ProcessorGraph {
    */
   private void validateReachability() {
     // validate all nodes are reachable from the sources
-    final Set<ProcessorNode> reachable = findReachable();
+    final Set<JobNode> reachable = findReachable();
     if (reachable.size() != nodes.size()) {
-      Set<ProcessorNode> unreachable = new HashSet<>(nodes.values());
+      Set<JobNode> unreachable = new HashSet<>(nodes.values());
       unreachable.removeAll(reachable);
-      throw new IllegalArgumentException(String.format("Processors %s cannot be reached from Sources.",
-          String.join(", ", unreachable.stream().map(ProcessorNode::getId).collect(Collectors.toList()))));
+      throw new IllegalArgumentException(String.format("Jobs %s cannot be reached from Sources.",
+          String.join(", ", unreachable.stream().map(JobNode::getId).collect(Collectors.toList()))));
     }
   }
 
   /**
    * Find the reachable set of nodes using BFS.
-   * @return reachable set of {@link ProcessorNode}
+   * @return reachable set of {@link JobNode}
    */
-  /* package private */ Set<ProcessorNode> findReachable() {
-    Queue<ProcessorNode> queue = new ArrayDeque<>();
-    Set<ProcessorNode> visited = new HashSet<>();
+  /* package private */ Set<JobNode> findReachable() {
+    Queue<JobNode> queue = new ArrayDeque<>();
+    Set<JobNode> visited = new HashSet<>();
 
     sources.forEach(source -> {
-        List<ProcessorNode> next = source.getTargetNodes();
+        List<JobNode> next = source.getTargetNodes();
         queue.addAll(next);
         visited.addAll(next);
       });
 
     while (!queue.isEmpty()) {
-      ProcessorNode node = queue.poll();
+      JobNode node = queue.poll();
       node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(target -> {
           if (!visited.contains(target)) {
             visited.add(target);
@@ -272,17 +281,17 @@ public class ProcessorGraph {
   /**
    * An variation of Kahn's algorithm of topological sorting.
    * This algorithm also takes account of the simple loops in the graph
-   * @return topologically sorted {@link ProcessorNode}s
+   * @return topologically sorted {@link JobNode}s
    */
-  /* package private */ List<ProcessorNode> topologicalSort() {
-    Collection<ProcessorNode> pnodes = nodes.values();
+  /* package private */ List<JobNode> topologicalSort() {
+    Collection<JobNode> pnodes = nodes.values();
     if (pnodes.size() == 1) {
       return new ArrayList<>(pnodes);
     }
 
-    Queue<ProcessorNode> q = new ArrayDeque<>();
+    Queue<JobNode> q = new ArrayDeque<>();
     Map<String, Long> indegree = new HashMap<>();
-    Set<ProcessorNode> visited = new HashSet<>();
+    Set<JobNode> visited = new HashSet<>();
     pnodes.forEach(node -> {
         String nid = node.getId();
         //only count the degrees of intermediate streams
@@ -296,8 +305,8 @@ public class ProcessorGraph {
         }
       });
 
-    List<ProcessorNode> sortedNodes = new ArrayList<>();
-    Set<ProcessorNode> reachable = new HashSet<>();
+    List<JobNode> sortedNodes = new ArrayList<>();
+    Set<JobNode> reachable = new HashSet<>();
     while (sortedNodes.size() < pnodes.size()) {
       // Here we use indegree-based approach to implment Kahn's algorithm for topological sort
       // This approach will not change the graph itself during computation.
@@ -309,7 +318,7 @@ public class ProcessorGraph {
       // 4. loop 1-3 until no more nodes with indegree 0
       //
       while (!q.isEmpty()) {
-        ProcessorNode node = q.poll();
+        JobNode node = q.poll();
         sortedNodes.add(node);
         node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> {
             String nid = n.getId();
@@ -331,8 +340,8 @@ public class ProcessorGraph {
         if (!reachable.isEmpty()) {
           //find out the nodes with minimal input edge
           long min = Long.MAX_VALUE;
-          ProcessorNode minNode = null;
-          for (ProcessorNode node : reachable) {
+          JobNode minNode = null;
+          for (JobNode node : reachable) {
             Long degree = indegree.get(node.getId());
             if (degree < min) {
               min = degree;
@@ -345,7 +354,7 @@ public class ProcessorGraph {
         } else {
           // all the remaining nodes should be reachable from sources
           // start from sources again to find the next node that hasn't been visited
-          ProcessorNode nextNode = sources.stream().flatMap(source -> source.getTargetNodes().stream())
+          JobNode nextNode = sources.stream().flatMap(source -> source.getTargetNodes().stream())
               .filter(node -> !visited.contains(node))
               .findAny().get();
           q.add(nextNode);
@@ -35,21 +35,25 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * A ProcessorNode is a physical execution unit. In RemoteExecutionEnvironment, it's a job that will be submitted
+ * A JobNode is a physical execution unit. In RemoteExecutionEnvironment, it's a job that will be submitted
  * to remote cluster. In LocalExecutionEnvironment, it's a set of StreamProcessors for local execution.
- * A ProcessorNode contains the input/output, and the configs for physical execution.
+ * A JobNode contains the input/output, and the configs for physical execution.
  */
-public class ProcessorNode {
-  private static final Logger log = LoggerFactory.getLogger(ProcessorNode.class);
-  private static final String CONFIG_PROCESSOR_PREFIX = "processors.%s.";
+public class JobNode {
+  private static final Logger log = LoggerFactory.getLogger(JobNode.class);
+  private static final String CONFIG_JOB_PREFIX = "jobs.%s.";
 
+  private final String jobName;
+  private final String jobId;
   private final String id;
   private final List<StreamEdge> inEdges = new ArrayList<>();
   private final List<StreamEdge> outEdges = new ArrayList<>();
   private final Config config;
 
-  ProcessorNode(String id, Config config) {
-    this.id = id;
+  JobNode(String jobName, String jobId, Config config) {
+    this.jobName = jobName;
+    this.jobId = jobId;
+    this.id = createId(jobName, jobId);
     this.config = config;
   }
 
@@ -57,6 +61,14 @@ public class ProcessorNode {
     return id;
   }
 
+  public String getJobName() {
+    return jobName;
+  }
+
+  public String getJobId() {
+    return jobId;
+  }
+
   void addInEdge(StreamEdge in) {
     inEdges.add(in);
   }
@@ -75,22 +87,22 @@ public class ProcessorNode {
 
   public Config generateConfig() {
     Map<String, String> configs = new HashMap<>();
-    configs.put(JobConfig.JOB_NAME(), id);
+    configs.put(JobConfig.JOB_NAME(), jobName);
 
     List<String> inputs = inEdges.stream().map(edge -> edge.getFormattedSystemStream()).collect(Collectors.toList());
     configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
-    log.info("Processor {} has generated configs {}", id, configs);
+    log.info("Job {} has generated configs {}", jobName, configs);
 
-    String configPrefix = String.format(CONFIG_PROCESSOR_PREFIX, id);
-    // TODO: Disallow user specifying processor inputs/outputs. This info comes strictly from the pipeline.
+    String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
+    // TODO: Disallow user specifying job inputs/outputs. This info comes strictly from the pipeline.
     return Util.rewriteConfig(extractScopedConfig(config, new MapConfig(configs), configPrefix));
   }
 
   /**
    * This function extract the subset of configs from the full config, and use it to override the generated configs
-   * from the processor.
+   * from the job.
    * @param fullConfig full config
-   * @param generatedConfig config generated from the processor
+   * @param generatedConfig config generated for the job
    * @param configPrefix prefix to extract the subset of the config overrides
    * @return config that merges the generated configs and overrides
    */
@@ -113,4 +125,8 @@ public class ProcessorNode {
 
     return scopedConfig;
   }
+
+  static String createId(String jobName, String jobId) {
+    return String.format("%s-%s", jobName, jobId);
+  }
 }
index 3215a22..9596d0f 100644 (file)
@@ -27,16 +27,16 @@ import org.apache.samza.util.Util;
 
 
 /**
- * A StreamEdge connects the source {@link ProcessorNode}s to the target {@link ProcessorNode}s with a stream.
- * If it's a sink StreamEdge, the target ProcessorNode is empty.
- * If it's a source StreamEdge, the source ProcessorNode is empty.
+ * A StreamEdge connects the source {@link JobNode}s to the target {@link JobNode}s with a stream.
+ * If it's a sink StreamEdge, the target JobNode is empty.
+ * If it's a source StreamEdge, the source JobNode is empty.
  */
 public class StreamEdge {
   public static final int PARTITIONS_UNKNOWN = -1;
 
   private final StreamSpec streamSpec;
-  private final List<ProcessorNode> sourceNodes = new ArrayList<>();
-  private final List<ProcessorNode> targetNodes = new ArrayList<>();
+  private final List<JobNode> sourceNodes = new ArrayList<>();
+  private final List<JobNode> targetNodes = new ArrayList<>();
 
   private String name = "";
   private int partitions = PARTITIONS_UNKNOWN;
@@ -46,11 +46,11 @@ public class StreamEdge {
     this.name = Util.getNameFromSystemStream(getSystemStream());
   }
 
-  void addSourceNode(ProcessorNode sourceNode) {
+  void addSourceNode(JobNode sourceNode) {
     sourceNodes.add(sourceNode);
   }
 
-  void addTargetNode(ProcessorNode targetNode) {
+  void addTargetNode(JobNode targetNode) {
     targetNodes.add(targetNode);
   }
 
@@ -70,11 +70,11 @@ public class StreamEdge {
     return Util.getNameFromSystemStream(getSystemStream());
   }
 
-  List<ProcessorNode> getSourceNodes() {
+  List<JobNode> getSourceNodes() {
     return sourceNodes;
   }
 
-  List<ProcessorNode> getTargetNodes() {
+  List<JobNode> getTargetNodes() {
     return targetNodes;
   }
 
index 1444662..5a125a2 100644 (file)
@@ -53,6 +53,12 @@ public interface OperatorSpec<OM> {
   MessageStreamImpl<OM> getNextStream();
 
   /**
+   * Return the ID for this operator
+   * @return ID integer
+   */
+  int getOpId();
+
+  /**
    * Init method to initialize the context for this {@link OperatorSpec}. The default implementation is NO-OP.
    *
    * @param config  the {@link Config} object for this task
index 9fe493e..cd6c492 100644 (file)
@@ -26,8 +26,8 @@ import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.execution.ExecutionPlanner;
-import org.apache.samza.execution.ProcessorGraph;
-import org.apache.samza.execution.ProcessorNode;
+import org.apache.samza.execution.JobGraph;
+import org.apache.samza.execution.JobNode;
 import org.apache.samza.execution.StreamManager;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.JobRunner;
@@ -60,19 +60,19 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
   public void run(StreamApplication app) {
     try {
       // 1. initialize and plan
-      ProcessorGraph processorGraph = getExecutionPlan(app);
+      JobGraph jobGraph = getExecutionPlan(app);
 
       // 2. create the necessary streams
-      List<StreamSpec> streams = processorGraph.getIntermediateStreams().stream()
+      List<StreamSpec> streams = jobGraph.getIntermediateStreams().stream()
           .map(streamEdge -> streamEdge.getStreamSpec())
           .collect(Collectors.toList());
       streamManager.createStreams(streams);
 
       // 3. submit jobs for remote execution
-      processorGraph.getProcessorNodes().forEach(processor -> {
-          Config processorConfig = processor.generateConfig();
-          log.info("Starting processor {} with config {}", processor.getId(), processorConfig);
-          JobRunner runner = new JobRunner(processorConfig);
+      jobGraph.getJobNodes().forEach(job -> {
+          Config jobConfig = job.generateConfig();
+          log.info("Starting job {} with config {}", job.getId(), jobConfig);
+          JobRunner runner = new JobRunner(jobConfig);
           runner.run(true);
         });
     } catch (Throwable t) {
@@ -83,12 +83,12 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
   @Override
   public void kill(StreamApplication app) {
     try {
-      ProcessorGraph processorGraph = getExecutionPlan(app);
+      JobGraph jobGraph = getExecutionPlan(app);
 
-      processorGraph.getProcessorNodes().forEach(processor -> {
-          Config processorConfig = processor.generateConfig();
-          log.info("Killing processor {}", processor.getId());
-          JobRunner runner = new JobRunner(processorConfig);
+      jobGraph.getJobNodes().forEach(job -> {
+          Config jobConfig = job.generateConfig();
+          log.info("Killing job {}", job.getId());
+          JobRunner runner = new JobRunner(jobConfig);
           runner.kill();
         });
     } catch (Throwable t) {
@@ -102,12 +102,12 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
       boolean finished = false;
       boolean unsuccessfulFinish = false;
 
-      ProcessorGraph processorGraph = getExecutionPlan(app);
-      for (ProcessorNode processor : processorGraph.getProcessorNodes()) {
-        Config processorConfig = processor.generateConfig();
-        JobRunner runner = new JobRunner(processorConfig);
+      JobGraph jobGraph = getExecutionPlan(app);
+      for (JobNode job : jobGraph.getJobNodes()) {
+        Config jobConfig = job.generateConfig();
+        JobRunner runner = new JobRunner(jobConfig);
         ApplicationStatus status = runner.status();
-        log.debug("Status is {} for processor {}", new Object[]{status, processor.getId()});
+        log.debug("Status is {} for jopb {}", new Object[]{status, job.getId()});
 
         switch (status) {
           case Running:
@@ -133,7 +133,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
     }
   }
 
-  private ProcessorGraph getExecutionPlan(StreamApplication app) throws Exception {
+  private JobGraph getExecutionPlan(StreamApplication app) throws Exception {
     // build stream graph
     StreamGraph streamGraph = new StreamGraphImpl(this, config);
     app.init(streamGraph, config);
index dc828d9..e524ba1 100644 (file)
@@ -227,26 +227,26 @@ public class TestExecutionPlanner {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamGraph streamGraph = createStreamGraphWithJoin();
 
-    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
-    assertTrue(processorGraph.getSources().size() == 3);
-    assertTrue(processorGraph.getSinks().size() == 2);
-    assertTrue(processorGraph.getIntermediateStreams().size() == 2); // two streams generated by partitionBy
+    JobGraph jobGraph = planner.createJobGraph(streamGraph);
+    assertTrue(jobGraph.getSources().size() == 3);
+    assertTrue(jobGraph.getSinks().size() == 2);
+    assertTrue(jobGraph.getIntermediateStreams().size() == 2); // two streams generated by partitionBy
   }
 
   @Test
   public void testFetchExistingStreamPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamGraph streamGraph = createStreamGraphWithJoin();
-    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+    JobGraph jobGraph = planner.createJobGraph(streamGraph);
 
-    ExecutionPlanner.updateExistingPartitions(processorGraph, streamManager);
-    assertTrue(processorGraph.getOrCreateEdge(input1).getPartitionCount() == 64);
-    assertTrue(processorGraph.getOrCreateEdge(input2).getPartitionCount() == 16);
-    assertTrue(processorGraph.getOrCreateEdge(input3).getPartitionCount() == 32);
-    assertTrue(processorGraph.getOrCreateEdge(output1).getPartitionCount() == 8);
-    assertTrue(processorGraph.getOrCreateEdge(output2).getPartitionCount() == 16);
+    ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
+    assertTrue(jobGraph.getOrCreateEdge(input1).getPartitionCount() == 64);
+    assertTrue(jobGraph.getOrCreateEdge(input2).getPartitionCount() == 16);
+    assertTrue(jobGraph.getOrCreateEdge(input3).getPartitionCount() == 32);
+    assertTrue(jobGraph.getOrCreateEdge(output1).getPartitionCount() == 8);
+    assertTrue(jobGraph.getOrCreateEdge(output2).getPartitionCount() == 16);
 
-    processorGraph.getIntermediateStreams().forEach(edge -> {
+    jobGraph.getIntermediateStreams().forEach(edge -> {
         assertTrue(edge.getPartitionCount() == -1);
       });
   }
@@ -255,13 +255,13 @@ public class TestExecutionPlanner {
   public void testCalculateJoinInputPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamGraph streamGraph = createStreamGraphWithJoin();
-    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
+    JobGraph jobGraph = planner.createJobGraph(streamGraph);
 
-    ExecutionPlanner.updateExistingPartitions(processorGraph, streamManager);
-    ExecutionPlanner.calculateJoinInputPartitions(streamGraph, processorGraph);
+    ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
+    ExecutionPlanner.calculateJoinInputPartitions(streamGraph, jobGraph);
 
     // the partitions should be the same as input1
-    processorGraph.getIntermediateStreams().forEach(edge -> {
+    jobGraph.getIntermediateStreams().forEach(edge -> {
         assertTrue(edge.getPartitionCount() == 64);
       });
   }
@@ -274,11 +274,11 @@ public class TestExecutionPlanner {
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
     StreamGraph streamGraph = createSimpleGraph();
-    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
-    planner.calculatePartitions(streamGraph, processorGraph);
+    JobGraph jobGraph = planner.createJobGraph(streamGraph);
+    planner.calculatePartitions(streamGraph, jobGraph);
 
     // the partitions should be the same as input1
-    processorGraph.getIntermediateStreams().forEach(edge -> {
+    jobGraph.getIntermediateStreams().forEach(edge -> {
         assertTrue(edge.getPartitionCount() == DEFAULT_PARTITIONS);
       });
   }
@@ -287,11 +287,11 @@ public class TestExecutionPlanner {
   public void testCalculateIntStreamPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     StreamGraph streamGraph = createSimpleGraph();
-    ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph);
-    planner.calculatePartitions(streamGraph, processorGraph);
+    JobGraph jobGraph = planner.createJobGraph(streamGraph);
+    planner.calculatePartitions(streamGraph, jobGraph);
 
     // the partitions should be the same as input1
-    processorGraph.getIntermediateStreams().forEach(edge -> {
+    jobGraph.getIntermediateStreams().forEach(edge -> {
         assertTrue(edge.getPartitionCount() == 64); // max of input1 and output1
       });
   }
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
new file mode 100644 (file)
index 0000000..d829b64
--- /dev/null
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestJobGraph {
+
+  JobGraph graph1;
+  JobGraph graph2;
+  JobGraph graph3;
+  JobGraph graph4;
+  int streamSeq = 0;
+
+  private StreamSpec genStream() {
+    ++streamSeq;
+
+    return new StreamSpec(String.valueOf(streamSeq), "test-stream", "test-system");
+  }
+
+  /**
+   * graph1 is the example graph from wikipedia
+   *
+   * 5   7   3
+   * | / | / |
+   * v   v   |
+   * 11  8   |
+   * | \X   /
+   * v v \v
+   * 2 9 10
+   */
+  private void createGraph1() {
+    graph1 = new JobGraph(null, null);
+
+    JobNode n2 = graph1.getOrCreateNode("2", "1");
+    JobNode n3 = graph1.getOrCreateNode("3", "1");
+    JobNode n5 = graph1.getOrCreateNode("5", "1");
+    JobNode n7 = graph1.getOrCreateNode("7", "1");
+    JobNode n8 = graph1.getOrCreateNode("8", "1");
+    JobNode n9 = graph1.getOrCreateNode("9", "1");
+    JobNode n10 = graph1.getOrCreateNode("10", "1");
+    JobNode n11 = graph1.getOrCreateNode("11", "1");
+
+    graph1.addSource(genStream(), n5);
+    graph1.addSource(genStream(), n7);
+    graph1.addSource(genStream(), n3);
+    graph1.addIntermediateStream(genStream(), n5, n11);
+    graph1.addIntermediateStream(genStream(), n7, n11);
+    graph1.addIntermediateStream(genStream(), n7, n8);
+    graph1.addIntermediateStream(genStream(), n3, n8);
+    graph1.addIntermediateStream(genStream(), n11, n2);
+    graph1.addIntermediateStream(genStream(), n11, n9);
+    graph1.addIntermediateStream(genStream(), n8, n9);
+    graph1.addIntermediateStream(genStream(), n11, n10);
+    graph1.addSink(genStream(), n2);
+    graph1.addSink(genStream(), n9);
+    graph1.addSink(genStream(), n10);
+  }
+
+  /**
+   * graph2 is a graph with a loop
+   * 1 -> 2 -> 3 -> 4 -> 5 -> 7
+   *      |<---6 <--|    <>
+   */
+  private void createGraph2() {
+    graph2 = new JobGraph(null, null);
+
+    JobNode n1 = graph2.getOrCreateNode("1", "1");
+    JobNode n2 = graph2.getOrCreateNode("2", "1");
+    JobNode n3 = graph2.getOrCreateNode("3", "1");
+    JobNode n4 = graph2.getOrCreateNode("4", "1");
+    JobNode n5 = graph2.getOrCreateNode("5", "1");
+    JobNode n6 = graph2.getOrCreateNode("6", "1");
+    JobNode n7 = graph2.getOrCreateNode("7", "1");
+
+    graph2.addSource(genStream(), n1);
+    graph2.addIntermediateStream(genStream(), n1, n2);
+    graph2.addIntermediateStream(genStream(), n2, n3);
+    graph2.addIntermediateStream(genStream(), n3, n4);
+    graph2.addIntermediateStream(genStream(), n4, n5);
+    graph2.addIntermediateStream(genStream(), n4, n6);
+    graph2.addIntermediateStream(genStream(), n6, n2);
+    graph2.addIntermediateStream(genStream(), n5, n5);
+    graph2.addIntermediateStream(genStream(), n5, n7);
+    graph2.addSink(genStream(), n7);
+  }
+
+  /**
+   * graph3 is a graph with two self loops
+   * 1<->1 -> 2<->2
+   */
+  private void createGraph3() {
+    graph3 = new JobGraph(null, null);
+
+    JobNode n1 = graph3.getOrCreateNode("1", "1");
+    JobNode n2 = graph3.getOrCreateNode("2", "1");
+
+    graph3.addSource(genStream(), n1);
+    graph3.addIntermediateStream(genStream(), n1, n1);
+    graph3.addIntermediateStream(genStream(), n1, n2);
+    graph3.addIntermediateStream(genStream(), n2, n2);
+  }
+
+  /**
+   * graph4 is a graph of single-loop node
+   * 1<->1
+   */
+  private void createGraph4() {
+    graph4 = new JobGraph(null, null);
+
+    JobNode n1 = graph4.getOrCreateNode("1", "1");
+
+    graph4.addSource(genStream(), n1);
+    graph4.addIntermediateStream(genStream(), n1, n1);
+  }
+
+  @Before
+  public void setup() {
+    createGraph1();
+    createGraph2();
+    createGraph3();
+    createGraph4();
+  }
+
+  @Test
+  public void testAddSource() {
+    JobGraph graph = new JobGraph(null, null);
+
+    /**
+     * s1 -> 1
+     * s2 ->|
+     *
+     * s3 -> 2
+     *   |-> 3
+     */
+    JobNode n1 = graph.getOrCreateNode("1", "1");
+    JobNode n2 = graph.getOrCreateNode("2", "1");
+    JobNode n3 = graph.getOrCreateNode("3", "1");
+    StreamSpec s1 = genStream();
+    StreamSpec s2 = genStream();
+    StreamSpec s3 = genStream();
+    graph.addSource(s1, n1);
+    graph.addSource(s2, n1);
+    graph.addSource(s3, n2);
+    graph.addSource(s3, n3);
+
+    assertTrue(graph.getSources().size() == 3);
+
+    assertTrue(graph.getOrCreateNode("1", "1").getInEdges().size() == 2);
+    assertTrue(graph.getOrCreateNode("2", "1").getInEdges().size() == 1);
+    assertTrue(graph.getOrCreateNode("3", "1").getInEdges().size() == 1);
+
+    assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 0);
+    assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 1);
+    assertTrue(graph.getOrCreateEdge(s2).getSourceNodes().size() == 0);
+    assertTrue(graph.getOrCreateEdge(s2).getTargetNodes().size() == 1);
+    assertTrue(graph.getOrCreateEdge(s3).getSourceNodes().size() == 0);
+    assertTrue(graph.getOrCreateEdge(s3).getTargetNodes().size() == 2);
+  }
+
+  @Test
+  public void testAddSink() {
+    /**
+     * 1 -> s1
+     * 2 -> s2
+     * 2 -> s3
+     */
+    JobGraph graph = new JobGraph(null, null);
+    JobNode n1 = graph.getOrCreateNode("1", "1");
+    JobNode n2 = graph.getOrCreateNode("2", "1");
+    StreamSpec s1 = genStream();
+    StreamSpec s2 = genStream();
+    StreamSpec s3 = genStream();
+    graph.addSink(s1, n1);
+    graph.addSink(s2, n2);
+    graph.addSink(s3, n2);
+
+    assertTrue(graph.getSinks().size() == 3);
+    assertTrue(graph.getOrCreateNode("1", "1").getOutEdges().size() == 1);
+    assertTrue(graph.getOrCreateNode("2", "1").getOutEdges().size() == 2);
+
+    assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 1);
+    assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 0);
+    assertTrue(graph.getOrCreateEdge(s2).getSourceNodes().size() == 1);
+    assertTrue(graph.getOrCreateEdge(s2).getTargetNodes().size() == 0);
+    assertTrue(graph.getOrCreateEdge(s3).getSourceNodes().size() == 1);
+    assertTrue(graph.getOrCreateEdge(s3).getTargetNodes().size() == 0);
+  }
+
+  @Test
+  public void testReachable() {
+    Set<JobNode> reachable1 = graph1.findReachable();
+    assertTrue(reachable1.size() == 8);
+
+    Set<JobNode> reachable2 = graph2.findReachable();
+    assertTrue(reachable2.size() == 7);
+  }
+
+  @Test
+  public void testTopologicalSort() {
+
+    // test graph1
+    List<JobNode> sortedNodes1 = graph1.topologicalSort();
+    Map<String, Integer> idxMap1 = new HashMap<>();
+    for (int i = 0; i < sortedNodes1.size(); i++) {
+      idxMap1.put(sortedNodes1.get(i).getJobName(), i);
+    }
+
+    assertTrue(idxMap1.size() == 8);
+    assertTrue(idxMap1.get("11") > idxMap1.get("5"));
+    assertTrue(idxMap1.get("11") > idxMap1.get("7"));
+    assertTrue(idxMap1.get("8") > idxMap1.get("7"));
+    assertTrue(idxMap1.get("8") > idxMap1.get("3"));
+    assertTrue(idxMap1.get("2") > idxMap1.get("11"));
+    assertTrue(idxMap1.get("9") > idxMap1.get("8"));
+    assertTrue(idxMap1.get("9") > idxMap1.get("11"));
+    assertTrue(idxMap1.get("10") > idxMap1.get("11"));
+    assertTrue(idxMap1.get("10") > idxMap1.get("3"));
+
+    // test graph2
+    List<JobNode> sortedNodes2 = graph2.topologicalSort();
+    Map<String, Integer> idxMap2 = new HashMap<>();
+    for (int i = 0; i < sortedNodes2.size(); i++) {
+      idxMap2.put(sortedNodes2.get(i).getJobName(), i);
+    }
+
+    assertTrue(idxMap2.size() == 7);
+    assertTrue(idxMap2.get("2") > idxMap2.get("1"));
+    assertTrue(idxMap2.get("3") > idxMap2.get("1"));
+    assertTrue(idxMap2.get("4") > idxMap2.get("1"));
+    assertTrue(idxMap2.get("6") > idxMap2.get("1"));
+    assertTrue(idxMap2.get("5") > idxMap2.get("4"));
+    assertTrue(idxMap2.get("7") > idxMap2.get("5"));
+
+    //test graph3
+    List<JobNode> sortedNodes3 = graph3.topologicalSort();
+    assertTrue(sortedNodes3.size() == 2);
+    assertEquals(sortedNodes3.get(0).getJobName(), "1");
+    assertEquals(sortedNodes3.get(1).getJobName(), "2");
+
+    //test graph4
+    List<JobNode> sortedNodes4 = graph4.topologicalSort();
+    assertTrue(sortedNodes4.size() == 1);
+    assertEquals(sortedNodes4.get(0).getJobName(), "1");
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java b/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java
deleted file mode 100644 (file)
index 2f89d91..0000000
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.execution;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.samza.system.StreamSpec;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-public class TestProcessorGraph {
-
-  ProcessorGraph graph1;
-  ProcessorGraph graph2;
-  ProcessorGraph graph3;
-  ProcessorGraph graph4;
-  int streamSeq = 0;
-
-  private StreamSpec genStream() {
-    ++streamSeq;
-
-    return new StreamSpec(String.valueOf(streamSeq), "test-stream", "test-system");
-  }
-
-  @Before
-  public void setup() {
-    /**
-     * graph1 is the example graph from wikipedia
-     *
-     * 5   7   3
-     * | / | / |
-     * v   v   |
-     * 11  8   |
-     * | \X   /
-     * v v \v
-     * 2 9 10
-     */
-    // init graph1
-    graph1 = new ProcessorGraph(null);
-    graph1.addSource(genStream(), "5");
-    graph1.addSource(genStream(), "7");
-    graph1.addSource(genStream(), "3");
-    graph1.addIntermediateStream(genStream(), "5", "11");
-    graph1.addIntermediateStream(genStream(), "7", "11");
-    graph1.addIntermediateStream(genStream(), "7", "8");
-    graph1.addIntermediateStream(genStream(), "3", "8");
-    graph1.addIntermediateStream(genStream(), "11", "2");
-    graph1.addIntermediateStream(genStream(), "11", "9");
-    graph1.addIntermediateStream(genStream(), "8", "9");
-    graph1.addIntermediateStream(genStream(), "11", "10");
-    graph1.addSink(genStream(), "2");
-    graph1.addSink(genStream(), "9");
-    graph1.addSink(genStream(), "10");
-
-    /**
-     * graph2 is a graph with a loop
-     * 1 -> 2 -> 3 -> 4 -> 5 -> 7
-     *      |<---6 <--|    <>
-     */
-    graph2 = new ProcessorGraph(null);
-    graph2.addSource(genStream(), "1");
-    graph2.addIntermediateStream(genStream(), "1", "2");
-    graph2.addIntermediateStream(genStream(), "2", "3");
-    graph2.addIntermediateStream(genStream(), "3", "4");
-    graph2.addIntermediateStream(genStream(), "4", "5");
-    graph2.addIntermediateStream(genStream(), "4", "6");
-    graph2.addIntermediateStream(genStream(), "6", "2");
-    graph2.addIntermediateStream(genStream(), "5", "5");
-    graph2.addIntermediateStream(genStream(), "5", "7");
-    graph2.addSink(genStream(), "7");
-
-    /**
-     * graph3 is a graph with self loops
-     * 1<->1 -> 2<->2
-     */
-    graph3 = new ProcessorGraph(null);
-    graph3.addSource(genStream(), "1");
-    graph3.addIntermediateStream(genStream(), "1", "1");
-    graph3.addIntermediateStream(genStream(), "1", "2");
-    graph3.addIntermediateStream(genStream(), "2", "2");
-
-    /**
-     * graph4 is a graph of single-loop node
-     * 1<->1
-     */
-    graph4 = new ProcessorGraph(null);
-    graph4.addSource(genStream(), "1");
-    graph4.addIntermediateStream(genStream(), "1", "1");
-  }
-
-  @Test
-  public void testAddSource() {
-    ProcessorGraph graph = new ProcessorGraph(null);
-
-    /**
-     * s1 -> 1
-     * s2 ->|
-     *
-     * s3 -> 2
-     *   |-> 3
-     */
-    StreamSpec s1 = genStream();
-    StreamSpec s2 = genStream();
-    StreamSpec s3 = genStream();
-    graph.addSource(s1, "1");
-    graph.addSource(s2, "1");
-    graph.addSource(s3, "2");
-    graph.addSource(s3, "3");
-
-    assertTrue(graph.getSources().size() == 3);
-
-    assertTrue(graph.getOrCreateProcessor("1").getInEdges().size() == 2);
-    assertTrue(graph.getOrCreateProcessor("2").getInEdges().size() == 1);
-    assertTrue(graph.getOrCreateProcessor("3").getInEdges().size() == 1);
-
-    assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 0);
-    assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 1);
-    assertTrue(graph.getOrCreateEdge(s2).getSourceNodes().size() == 0);
-    assertTrue(graph.getOrCreateEdge(s2).getTargetNodes().size() == 1);
-    assertTrue(graph.getOrCreateEdge(s3).getSourceNodes().size() == 0);
-    assertTrue(graph.getOrCreateEdge(s3).getTargetNodes().size() == 2);
-  }
-
-  @Test
-  public void testAddSink() {
-    /**
-     * 1 -> s1
-     * 2 -> s2
-     * 2 -> s3
-     */
-    StreamSpec s1 = genStream();
-    StreamSpec s2 = genStream();
-    StreamSpec s3 = genStream();
-    ProcessorGraph graph = new ProcessorGraph(null);
-    graph.addSink(s1, "1");
-    graph.addSink(s2, "2");
-    graph.addSink(s3, "2");
-
-    assertTrue(graph.getSinks().size() == 3);
-    assertTrue(graph.getOrCreateProcessor("1").getOutEdges().size() == 1);
-    assertTrue(graph.getOrCreateProcessor("2").getOutEdges().size() == 2);
-
-    assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 1);
-    assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 0);
-    assertTrue(graph.getOrCreateEdge(s2).getSourceNodes().size() == 1);
-    assertTrue(graph.getOrCreateEdge(s2).getTargetNodes().size() == 0);
-    assertTrue(graph.getOrCreateEdge(s3).getSourceNodes().size() == 1);
-    assertTrue(graph.getOrCreateEdge(s3).getTargetNodes().size() == 0);
-  }
-
-  @Test
-  public void testReachable() {
-    Set<ProcessorNode> reachable1 = graph1.findReachable();
-    assertTrue(reachable1.size() == 8);
-
-    Set<ProcessorNode> reachable2 = graph2.findReachable();
-    assertTrue(reachable2.size() == 7);
-  }
-
-  @Test
-  public void testTopologicalSort() {
-
-    // test graph1
-    List<ProcessorNode> sortedNodes1 = graph1.topologicalSort();
-    Map<String, Integer> idxMap1 = new HashMap<>();
-    for (int i = 0; i < sortedNodes1.size(); i++) {
-      idxMap1.put(sortedNodes1.get(i).getId(), i);
-    }
-
-    assertTrue(idxMap1.size() == 8);
-    assertTrue(idxMap1.get("11") > idxMap1.get("5"));
-    assertTrue(idxMap1.get("11") > idxMap1.get("7"));
-    assertTrue(idxMap1.get("8") > idxMap1.get("7"));
-    assertTrue(idxMap1.get("8") > idxMap1.get("3"));
-    assertTrue(idxMap1.get("2") > idxMap1.get("11"));
-    assertTrue(idxMap1.get("9") > idxMap1.get("8"));
-    assertTrue(idxMap1.get("9") > idxMap1.get("11"));
-    assertTrue(idxMap1.get("10") > idxMap1.get("11"));
-    assertTrue(idxMap1.get("10") > idxMap1.get("3"));
-
-    // test graph2
-    List<ProcessorNode> sortedNodes2 = graph2.topologicalSort();
-    Map<String, Integer> idxMap2 = new HashMap<>();
-    for (int i = 0; i < sortedNodes2.size(); i++) {
-      idxMap2.put(sortedNodes2.get(i).getId(), i);
-    }
-
-    assertTrue(idxMap2.size() == 7);
-    assertTrue(idxMap2.get("2") > idxMap2.get("1"));
-    assertTrue(idxMap2.get("3") > idxMap2.get("1"));
-    assertTrue(idxMap2.get("4") > idxMap2.get("1"));
-    assertTrue(idxMap2.get("6") > idxMap2.get("1"));
-    assertTrue(idxMap2.get("5") > idxMap2.get("4"));
-    assertTrue(idxMap2.get("7") > idxMap2.get("5"));
-
-    //test graph3
-    List<ProcessorNode> sortedNodes3 = graph3.topologicalSort();
-    assertTrue(sortedNodes3.size() == 2);
-    assertEquals(sortedNodes3.get(0).getId(), "1");
-    assertEquals(sortedNodes3.get(1).getId(), "2");
-
-    //test graph4
-    List<ProcessorNode> sortedNodes4 = graph4.topologicalSort();
-    assertTrue(sortedNodes4.size() == 1);
-    assertEquals(sortedNodes4.get(0).getId(), "1");
-  }
-}