SAMZA-1178: Generate JSON from StreamPlan
authorXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Thu, 6 Apr 2017 16:58:38 +0000 (09:58 -0700)
committerXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Thu, 6 Apr 2017 16:58:38 +0000 (09:58 -0700)
As the first step to visualize the StreamGraph/Plan, this patch generates a json representation of it. For the example StreamGraph in TestPlanJsonGenerator, here is the json that is generated (https://github.com/xinyuiscool/examples/blob/master/example_plan.json). This patch also introduces the immutable StreamPlan and did some code cleanup.

Author: Xinyu Liu <xiliu@xiliu-ld.linkedin.biz>

Reviewers: Jake Maes <jmakes@apache.org>

Closes #110 from xinyuiscool/SAMZA-1178

samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/execution/JobNode.java
samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java [new file with mode: 0644]

diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java
new file mode 100644 (file)
index 0000000..6e2b4c6
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import java.util.List;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.system.StreamSpec;
+
+
+/**
+ * This interface represents Samza {@link org.apache.samza.application.StreamApplication}
+ * plans for physical execution.
+ */
+public interface ExecutionPlan {
+
+  /**
+   * Returns the configs for single stage job, in the order of topologically sort.
+   * @return list of job configs
+   */
+  List<JobConfig> getJobConfigs();
+
+  /**
+   * Returns the intermediate streams that need to be created.
+   * @return intermediate {@link StreamSpec}s
+   */
+  List<StreamSpec> getIntermediateStreams();
+
+  /**
+   * Returns the JSON representation of the plan for visualization
+   * @return json string
+   * @throws Exception exception
+   */
+  String getPlanAsJson() throws Exception;
+}
index be807e9..ac39eb8 100644 (file)
@@ -57,11 +57,11 @@ public class ExecutionPlanner {
     this.streamManager = streamManager;
   }
 
-  public JobGraph plan(StreamGraphImpl streamGraph) throws Exception {
+  public ExecutionPlan plan(StreamGraphImpl streamGraph) throws Exception {
     // create physical job graph based on stream graph
     JobGraph jobGraph = createJobGraph(streamGraph);
 
-    if (!jobGraph.getIntermediateStreams().isEmpty()) {
+    if (!jobGraph.getIntermediateStreamEdges().isEmpty()) {
       // figure out the partitions for internal streams
       calculatePartitions(streamGraph, jobGraph);
     }
@@ -73,7 +73,7 @@ public class ExecutionPlanner {
    * Create the physical graph from StreamGraph
    */
   /* package private */ JobGraph createJobGraph(StreamGraphImpl streamGraph) {
-    JobGraph jobGraph = new JobGraph(streamGraph, config);
+    JobGraph jobGraph = new JobGraph(config);
     Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInputStreams().keySet());
     Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutputStreams().keySet());
     Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
@@ -84,7 +84,7 @@ public class ExecutionPlanner {
     // For this phase, we have a single job node for the whole dag
     String jobName = config.get(JobConfig.JOB_NAME());
     String jobId = config.get(JobConfig.JOB_ID(), "1");
-    JobNode node = jobGraph.getOrCreateNode(jobName, jobId);
+    JobNode node = jobGraph.getOrCreateNode(jobName, jobId, streamGraph);
 
     // add sources
     sourceStreams.forEach(spec -> jobGraph.addSource(spec, node));
@@ -259,7 +259,7 @@ public class ExecutionPlanner {
       int maxOutPartitions = maxPartition(jobGraph.getSinks());
       partitions = Math.max(maxInPartitions, maxOutPartitions);
     }
-    for (StreamEdge edge : jobGraph.getIntermediateStreams()) {
+    for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
       if (edge.getPartitionCount() <= 0) {
         edge.setPartitionCount(partitions);
       }
@@ -267,7 +267,7 @@ public class ExecutionPlanner {
   }
 
   private static void validatePartitions(JobGraph jobGraph) {
-    for (StreamEdge edge : jobGraph.getIntermediateStreams()) {
+    for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
       if (edge.getPartitionCount() <= 0) {
         throw new SamzaException(String.format("Failure to assign the partitions to Stream %s", edge.getFormattedSystemStream()));
       }
index 5c9f037..ff5fbdf 100644 (file)
@@ -31,7 +31,8 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.system.StreamSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,7 +46,7 @@ import org.slf4j.LoggerFactory;
  * Note that intermediate streams are both the input and output of a JobNode in JobGraph.
  * So the graph may have cycles and it's not a DAG.
  */
-public class JobGraph {
+/* package private */ class JobGraph implements ExecutionPlan {
   private static final Logger log = LoggerFactory.getLogger(JobGraph.class);
 
   private final Map<String, JobNode> nodes = new HashMap<>();
@@ -54,23 +55,49 @@ public class JobGraph {
   private final Set<StreamEdge> sinks = new HashSet<>();
   private final Set<StreamEdge> intermediateStreams = new HashSet<>();
   private final Config config;
-  private final StreamGraph streamGraph;
+  private final JobGraphJsonGenerator jsonGenerator = new JobGraphJsonGenerator();
 
   /**
    * The JobGraph is only constructed by the {@link ExecutionPlanner}.
    * @param config Config
    */
-  /* package private */ JobGraph(StreamGraph streamGraph, Config config) {
-    this.streamGraph = streamGraph;
+  JobGraph(Config config) {
     this.config = config;
   }
 
   /**
+   * Returns the configs for single stage job, in the order of topologically sort.
+   * @return list of job configs
+   */
+  public List<JobConfig> getJobConfigs() {
+    return getJobNodes().stream().map(JobNode::generateConfig).collect(Collectors.toList());
+  }
+
+  /**
+   * Returns the intermediate streams that need to be created.
+   * @return intermediate {@link StreamSpec}s
+   */
+  public List<StreamSpec> getIntermediateStreams() {
+    return getIntermediateStreamEdges().stream()
+        .map(streamEdge -> streamEdge.getStreamSpec())
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Returns the JSON representation of the plan for visualization
+   * @return json string
+   * @throws Exception
+   */
+  public String getPlanAsJson() throws Exception {
+    return jsonGenerator.toJson(this);
+  }
+
+  /**
    * Add a source stream to a {@link JobNode}
    * @param input source stream
    * @param node the job node that consumes from the source
    */
-  /* package private */ void addSource(StreamSpec input, JobNode node) {
+  void addSource(StreamSpec input, JobNode node) {
     StreamEdge edge = getOrCreateEdge(input);
     edge.addTargetNode(node);
     node.addInEdge(edge);
@@ -82,7 +109,7 @@ public class JobGraph {
    * @param output sink stream
    * @param node the job node that outputs to the sink
    */
-  /* package private */ void addSink(StreamSpec output, JobNode node) {
+  void addSink(StreamSpec output, JobNode node) {
     StreamEdge edge = getOrCreateEdge(output);
     edge.addSourceNode(node);
     node.addOutEdge(edge);
@@ -95,7 +122,7 @@ public class JobGraph {
    * @param from the source node
    * @param to the target node
    */
-  /* package private */ void addIntermediateStream(StreamSpec streamSpec, JobNode from, JobNode to) {
+  void addIntermediateStream(StreamSpec streamSpec, JobNode from, JobNode to) {
     StreamEdge edge = getOrCreateEdge(streamSpec);
     edge.addSourceNode(from);
     edge.addTargetNode(to);
@@ -110,11 +137,11 @@ public class JobGraph {
    * @param jobId id of the job
    * @return
    */
-  /* package private */JobNode getOrCreateNode(String jobName, String jobId) {
+  JobNode getOrCreateNode(String jobName, String jobId, StreamGraphImpl streamGraph) {
     String nodeId = JobNode.createId(jobName, jobId);
     JobNode node = nodes.get(nodeId);
     if (node == null) {
-      node = new JobNode(jobName, jobId, config);
+      node = new JobNode(jobName, jobId, streamGraph, config);
       nodes.put(nodeId, node);
     }
     return node;
@@ -125,7 +152,7 @@ public class JobGraph {
    * @param streamSpec spec of the StreamEdge
    * @return stream edge
    */
-  /* package private */StreamEdge getOrCreateEdge(StreamSpec streamSpec) {
+  StreamEdge getOrCreateEdge(StreamSpec streamSpec) {
     String streamId = streamSpec.getId();
     StreamEdge edge = edges.get(streamId);
     if (edge == null) {
@@ -139,7 +166,7 @@ public class JobGraph {
    * Returns the job nodes to be executed in the topological order
    * @return unmodifiable list of {@link JobNode}
    */
-  public List<JobNode> getJobNodes() {
+  List<JobNode> getJobNodes() {
     List<JobNode> sortedNodes = topologicalSort();
     return Collections.unmodifiableList(sortedNodes);
   }
@@ -148,7 +175,7 @@ public class JobGraph {
    * Returns the source streams in the graph
    * @return unmodifiable set of {@link StreamEdge}
    */
-  public Set<StreamEdge> getSources() {
+  Set<StreamEdge> getSources() {
     return Collections.unmodifiableSet(sources);
   }
 
@@ -156,7 +183,7 @@ public class JobGraph {
    * Return the sink streams in the graph
    * @return unmodifiable set of {@link StreamEdge}
    */
-  public Set<StreamEdge> getSinks() {
+  Set<StreamEdge> getSinks() {
     return Collections.unmodifiableSet(sinks);
   }
 
@@ -164,25 +191,16 @@ public class JobGraph {
    * Return the intermediate streams in the graph
    * @return unmodifiable set of {@link StreamEdge}
    */
-  public Set<StreamEdge> getIntermediateStreams() {
+  Set<StreamEdge> getIntermediateStreamEdges() {
     return Collections.unmodifiableSet(intermediateStreams);
   }
 
   /**
-   * Return the {@link StreamGraph}
-   * @return {@link StreamGraph}
-   */
-  public StreamGraph getStreamGraph() {
-    return this.streamGraph;
-  }
-
-
-  /**
    * Validate the graph has the correct topology, meaning the sources are coming from external streams,
    * sinks are going to external streams, and the nodes are connected with intermediate streams.
    * Also validate all the nodes are reachable from the sources.
    */
-  public void validate() {
+  void validate() {
     validateSources();
     validateSinks();
     validateInternalStreams();
@@ -255,7 +273,7 @@ public class JobGraph {
    * Find the reachable set of nodes using BFS.
    * @return reachable set of {@link JobNode}
    */
-  /* package private */ Set<JobNode> findReachable() {
+  Set<JobNode> findReachable() {
     Queue<JobNode> queue = new ArrayDeque<>();
     Set<JobNode> visited = new HashSet<>();
 
@@ -283,7 +301,7 @@ public class JobGraph {
    * This algorithm also takes account of the simple loops in the graph
    * @return topologically sorted {@link JobNode}s
    */
-  /* package private */ List<JobNode> topologicalSort() {
+  List<JobNode> topologicalSort() {
     Collection<JobNode> pnodes = nodes.values();
     if (pnodes.size() == 1) {
       return new ArrayList<>(pnodes);
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
new file mode 100644 (file)
index 0000000..317616c
--- /dev/null
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * This class generates the JSON representation of the {@link JobGraph}.
+ */
+public class JobGraphJsonGenerator {
+
+  /**
+   * This class provides the necessary connection of operators for traversal.
+   */
+  static abstract class Traversable {
+    @JsonProperty("NextOperatorIds")
+    Set<Integer>  nextOperatorIds = new HashSet<>();
+  }
+
+  static final class OperatorJson extends Traversable {
+    @JsonProperty("OpCode")
+    String opCode;
+    @JsonProperty("OpId")
+    int opId;
+    @JsonProperty("OutputStreamId")
+    String outputStreamId;
+    @JsonProperty("PairedOpId")
+    int pairedOpId = -1;  //for join operator, we will have a pair nodes for two partial joins
+  }
+
+  static final class StreamSpecJson {
+    @JsonProperty("Id")
+    String id;
+    @JsonProperty("SystemName")
+    String systemName;
+    @JsonProperty("PhysicalName")
+    String physicalName;
+    @JsonProperty("PartitionCount")
+    int partitionCount;
+  }
+
+  static final class StreamEdgeJson {
+    @JsonProperty("StreamSpec")
+    StreamSpecJson streamSpec;
+  }
+
+  static final class OperatorGraphJson {
+    @JsonProperty("InputStreams")
+    List<InputStreamJson> inputStreams;
+    @JsonProperty("Operators")
+    Map<Integer, OperatorJson> operators = new HashMap<>();
+  }
+
+  static final class InputStreamJson extends Traversable {
+    @JsonProperty("StreamId")
+    String streamId;
+  }
+
+  static final class JobNodeJson {
+    @JsonProperty("JobName")
+    String jobName;
+    @JsonProperty("JobId")
+    String jobId;
+    @JsonProperty("OperatorGraph")
+    OperatorGraphJson operatorGraph;
+  }
+
+  static final class JobGraphJson {
+    @JsonProperty("Jobs")
+    List<JobNodeJson> jobs;
+    @JsonProperty("Streams")
+    Map<String, StreamEdgeJson> streams;
+  }
+
+  // Mapping from the output stream to the join spec. Since StreamGraph creates two partial join operators for a join and they
+  // will have the same output stream, this mapping is used to choose one of them as the unique join spec representing this join
+  // (who register first in the map wins).
+  Map<MessageStream, OperatorSpec> outputStreamToJoinSpec = new HashMap<>();
+
+  /**
+   * Returns the JSON representation of a {@link JobGraph}
+   * @param jobGraph {@link JobGraph}
+   * @return JSON of the graph
+   * @throws Exception exception during creating JSON
+   */
+  /* package private */ String toJson(JobGraph jobGraph) throws Exception {
+    JobGraphJson jobGraphJson = new JobGraphJson();
+
+    // build StreamEdge JSON
+    jobGraphJson.streams = new HashMap<>();
+    jobGraph.getSources().forEach(e -> getOrCreateStreamEdgeJson(e, jobGraphJson.streams));
+    jobGraph.getSinks().forEach(e -> getOrCreateStreamEdgeJson(e, jobGraphJson.streams));
+    jobGraph.getIntermediateStreamEdges().forEach(e -> getOrCreateStreamEdgeJson(e, jobGraphJson.streams));
+
+    jobGraphJson.jobs = jobGraph.getJobNodes().stream()
+        .map(jobNode -> buildJobNodeJson(jobNode, jobGraphJson.streams))
+        .collect(Collectors.toList());
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.writeValue(out, jobGraphJson);
+    return new String(out.toByteArray());
+  }
+
+  /**
+   * Create JSON POJO for a {@link JobNode}, including the {@link org.apache.samza.operators.StreamGraph} for this job
+   * @param jobNode job node in the {@link JobGraph}
+   * @param streamEdges map of {@link org.apache.samza.execution.JobGraphJsonGenerator.StreamEdgeJson}
+   * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.JobNodeJson}
+   */
+  private JobNodeJson buildJobNodeJson(JobNode jobNode, Map<String, StreamEdgeJson> streamEdges) {
+    JobNodeJson job = new JobNodeJson();
+    job.jobName = jobNode.getJobName();
+    job.jobId = jobNode.getJobId();
+    job.operatorGraph = buildOperatorGraphJson(jobNode);
+    return job;
+  }
+
+  /**
+   * Traverse the {@StreamGraph} and build the operator graph JSON POJO.
+   * @param jobNode job node in the {@link JobGraph}
+   * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.OperatorGraphJson}
+   */
+  private OperatorGraphJson buildOperatorGraphJson(JobNode jobNode) {
+    OperatorGraphJson opGraph = new OperatorGraphJson();
+    opGraph.inputStreams = new ArrayList<>();
+    jobNode.getStreamGraph().getInputStreams().forEach((streamSpec, stream) -> {
+        InputStreamJson inputJson = new InputStreamJson();
+        inputJson.streamId = streamSpec.getId();
+        opGraph.inputStreams.add(inputJson);
+        updateOperatorGraphJson((MessageStreamImpl) stream, inputJson, opGraph);
+      });
+    return opGraph;
+  }
+
+  /**
+   * Traverse the {@StreamGraph} recursively and update the operator graph JSON POJO.
+   * @param messageStream input
+   * @param parent parent node in the traveral
+   * @param opGraph operator graph to build
+   */
+  private void updateOperatorGraphJson(MessageStreamImpl messageStream, Traversable parent, OperatorGraphJson opGraph) {
+    Collection<OperatorSpec> specs = messageStream.getRegisteredOperatorSpecs();
+    specs.forEach(opSpec -> {
+        parent.nextOperatorIds.add(opSpec.getOpId());
+
+        OperatorJson opJson = getOrCreateOperatorJson(opSpec, opGraph);
+        if (opSpec instanceof SinkOperatorSpec) {
+          opJson.outputStreamId = ((SinkOperatorSpec) opSpec).getOutputStream().getStreamSpec().getId();
+        } else if (opSpec.getNextStream() != null) {
+          updateOperatorGraphJson(opSpec.getNextStream(), opJson, opGraph);
+        }
+      });
+  }
+
+  /**
+   * Get or create the JSON POJO for an operator.
+   * @param opSpec {@link OperatorSpec}
+   * @param opGraph {@link org.apache.samza.execution.JobGraphJsonGenerator.OperatorGraphJson}
+   * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.OperatorJson}
+   */
+  private OperatorJson getOrCreateOperatorJson(OperatorSpec opSpec, OperatorGraphJson opGraph) {
+    Map<Integer, OperatorJson> operators = opGraph.operators;
+    OperatorJson opJson = operators.get(opSpec.getOpId());
+    if (opJson == null) {
+      opJson = new OperatorJson();
+      opJson.opCode = opSpec.getOpCode().name();
+      opJson.opId = opSpec.getOpId();
+      operators.put(opSpec.getOpId(), opJson);
+    }
+
+    if (opSpec instanceof PartialJoinOperatorSpec) {
+      // every join will have two partial join operators
+      // we will choose one of them in order to consolidate the inputs
+      // the first one who registered with the outputStreamToJoinSpec will win
+      MessageStream output = opSpec.getNextStream();
+      OperatorSpec joinSpec = outputStreamToJoinSpec.get(output);
+      if (joinSpec == null) {
+        joinSpec = opSpec;
+        outputStreamToJoinSpec.put(output, joinSpec);
+      } else if (joinSpec != opSpec) {
+        OperatorJson joinNode = operators.get(joinSpec.getOpId());
+        joinNode.pairedOpId = opJson.opId;
+        opJson.pairedOpId = joinNode.opId;
+      }
+    }
+
+    return opJson;
+  }
+
+  /**
+   * Get or create the JSON POJO for a {@link StreamEdge}
+   * @param edge {@link StreamEdge}
+   * @param streamEdges map of streamId to {@link org.apache.samza.execution.JobGraphJsonGenerator.StreamEdgeJson}
+   * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.StreamEdgeJson}
+   */
+  private StreamEdgeJson getOrCreateStreamEdgeJson(StreamEdge edge, Map<String, StreamEdgeJson> streamEdges) {
+    String streamId = edge.getStreamSpec().getId();
+    StreamEdgeJson edgeJson = streamEdges.get(streamId);
+    if (edgeJson == null) {
+      edgeJson = new StreamEdgeJson();
+      StreamSpecJson streamSpecJson = new StreamSpecJson();
+      streamSpecJson.id = streamId;
+      streamSpecJson.systemName = edge.getStreamSpec().getSystemName();
+      streamSpecJson.physicalName = edge.getStreamSpec().getPhysicalName();
+      streamSpecJson.partitionCount = edge.getPartitionCount();
+      edgeJson.streamSpec = streamSpecJson;
+      streamEdges.put(streamId, edgeJson);
+    }
+    return edgeJson;
+  }
+}
index c47a69c..e19c9ca 100644 (file)
@@ -29,6 +29,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
+import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,17 +47,23 @@ public class JobNode {
   private final String jobName;
   private final String jobId;
   private final String id;
+  private final StreamGraphImpl streamGraph;
   private final List<StreamEdge> inEdges = new ArrayList<>();
   private final List<StreamEdge> outEdges = new ArrayList<>();
   private final Config config;
 
-  JobNode(String jobName, String jobId, Config config) {
+  JobNode(String jobName, String jobId, StreamGraphImpl streamGraph, Config config) {
     this.jobName = jobName;
     this.jobId = jobId;
     this.id = createId(jobName, jobId);
+    this.streamGraph = streamGraph;
     this.config = config;
   }
 
+  public StreamGraphImpl getStreamGraph() {
+    return streamGraph;
+  }
+
   public  String getId() {
     return id;
   }
@@ -85,7 +92,7 @@ public class JobNode {
     return outEdges;
   }
 
-  public Config generateConfig() {
+  public JobConfig generateConfig() {
     Map<String, String> configs = new HashMap<>();
     configs.put(JobConfig.JOB_NAME(), jobName);
 
@@ -95,7 +102,7 @@ public class JobNode {
 
     String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
     // TODO: Disallow user specifying job inputs/outputs. This info comes strictly from the pipeline.
-    return Util.rewriteConfig(extractScopedConfig(config, new MapConfig(configs), configPrefix));
+    return new JobConfig(Util.rewriteConfig(extractScopedConfig(config, new MapConfig(configs), configPrefix)));
   }
 
   /**
index 4cb0d28..76e4e76 100644 (file)
 
 package org.apache.samza.runtime;
 
-import java.util.List;
-import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.execution.ExecutionPlanner;
-import org.apache.samza.execution.JobGraph;
-import org.apache.samza.execution.JobNode;
 import org.apache.samza.execution.StreamManager;
+import org.apache.samza.execution.ExecutionPlan;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.JobRunner;
 import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.system.StreamSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,18 +56,14 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
   public void run(StreamApplication app) {
     try {
       // 1. initialize and plan
-      JobGraph jobGraph = getExecutionPlan(app);
+      ExecutionPlan plan = getExecutionPlan(app);
 
       // 2. create the necessary streams
-      List<StreamSpec> streams = jobGraph.getIntermediateStreams().stream()
-          .map(streamEdge -> streamEdge.getStreamSpec())
-          .collect(Collectors.toList());
-      streamManager.createStreams(streams);
+      streamManager.createStreams(plan.getIntermediateStreams());
 
       // 3. submit jobs for remote execution
-      jobGraph.getJobNodes().forEach(job -> {
-          Config jobConfig = job.generateConfig();
-          log.info("Starting job {} with config {}", job.getId(), jobConfig);
+      plan.getJobConfigs().forEach(jobConfig -> {
+          log.info("Starting job {} with config {}", jobConfig.getName(), jobConfig);
           JobRunner runner = new JobRunner(jobConfig);
           runner.run(true);
         });
@@ -82,11 +75,10 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
   @Override
   public void kill(StreamApplication app) {
     try {
-      JobGraph jobGraph = getExecutionPlan(app);
+      ExecutionPlan plan = getExecutionPlan(app);
 
-      jobGraph.getJobNodes().forEach(job -> {
-          Config jobConfig = job.generateConfig();
-          log.info("Killing job {}", job.getId());
+      plan.getJobConfigs().forEach(jobConfig -> {
+          log.info("Killing job {}", jobConfig.getName());
           JobRunner runner = new JobRunner(jobConfig);
           runner.kill();
         });
@@ -101,12 +93,11 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
       boolean finished = false;
       boolean unsuccessfulFinish = false;
 
-      JobGraph jobGraph = getExecutionPlan(app);
-      for (JobNode job : jobGraph.getJobNodes()) {
-        Config jobConfig = job.generateConfig();
+      ExecutionPlan plan = getExecutionPlan(app);
+      for (JobConfig jobConfig : plan.getJobConfigs()) {
         JobRunner runner = new JobRunner(jobConfig);
         ApplicationStatus status = runner.status();
-        log.debug("Status is {} for jopb {}", new Object[]{status, job.getId()});
+        log.debug("Status is {} for job {}", new Object[]{status, jobConfig.getName()});
 
         switch (status) {
           case Running:
@@ -132,7 +123,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
     }
   }
 
-  private JobGraph getExecutionPlan(StreamApplication app) throws Exception {
+  private ExecutionPlan getExecutionPlan(StreamApplication app) throws Exception {
     // build stream graph
     StreamGraphImpl streamGraph = new StreamGraphImpl(this, config);
     app.init(streamGraph, config);
index e661798..c55fcd0 100644 (file)
@@ -65,26 +65,7 @@ public class TestExecutionPlanner {
   private StreamSpec output1;
   private StreamSpec output2;
 
-  private JoinFunction createJoin() {
-    return new JoinFunction() {
-      @Override
-      public Object apply(Object message, Object otherMessage) {
-        return null;
-      }
-
-      @Override
-      public Object getFirstKey(Object message) {
-        return null;
-      }
-
-      @Override
-      public Object getSecondKey(Object message) {
-        return null;
-      }
-    };
-  }
-
-  private SystemAdmin createSystemAdmin(Map<String, Integer> streamToPartitions) {
+  static SystemAdmin createSystemAdmin(Map<String, Integer> streamToPartitions) {
 
     return new SystemAdmin() {
       @Override
@@ -162,8 +143,8 @@ public class TestExecutionPlanner {
     OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", null, null);
     OutputStream<Object, Object, Object> output2 = streamGraph.getOutputStream("output2", null, null);
 
-    m1.join(m2, createJoin(), Duration.ofHours(2)).sendTo(output1);
-    m3.join(m2, createJoin(), Duration.ofHours(1)).sendTo(output2);
+    m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(output1);
+    m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(output2);
 
     return streamGraph;
   }
@@ -239,7 +220,7 @@ public class TestExecutionPlanner {
     assertTrue(jobGraph.getOrCreateEdge(output1).getPartitionCount() == 8);
     assertTrue(jobGraph.getOrCreateEdge(output2).getPartitionCount() == 16);
 
-    jobGraph.getIntermediateStreams().forEach(edge -> {
+    jobGraph.getIntermediateStreamEdges().forEach(edge -> {
         assertTrue(edge.getPartitionCount() == -1);
       });
   }
index d829b64..4a4498c 100644 (file)
@@ -57,16 +57,16 @@ public class TestJobGraph {
    * 2 9 10
    */
   private void createGraph1() {
-    graph1 = new JobGraph(null, null);
+    graph1 = new JobGraph(null);
 
-    JobNode n2 = graph1.getOrCreateNode("2", "1");
-    JobNode n3 = graph1.getOrCreateNode("3", "1");
-    JobNode n5 = graph1.getOrCreateNode("5", "1");
-    JobNode n7 = graph1.getOrCreateNode("7", "1");
-    JobNode n8 = graph1.getOrCreateNode("8", "1");
-    JobNode n9 = graph1.getOrCreateNode("9", "1");
-    JobNode n10 = graph1.getOrCreateNode("10", "1");
-    JobNode n11 = graph1.getOrCreateNode("11", "1");
+    JobNode n2 = graph1.getOrCreateNode("2", "1", null);
+    JobNode n3 = graph1.getOrCreateNode("3", "1", null);
+    JobNode n5 = graph1.getOrCreateNode("5", "1", null);
+    JobNode n7 = graph1.getOrCreateNode("7", "1", null);
+    JobNode n8 = graph1.getOrCreateNode("8", "1", null);
+    JobNode n9 = graph1.getOrCreateNode("9", "1", null);
+    JobNode n10 = graph1.getOrCreateNode("10", "1", null);
+    JobNode n11 = graph1.getOrCreateNode("11", "1", null);
 
     graph1.addSource(genStream(), n5);
     graph1.addSource(genStream(), n7);
@@ -90,15 +90,15 @@ public class TestJobGraph {
    *      |<---6 <--|    <>
    */
   private void createGraph2() {
-    graph2 = new JobGraph(null, null);
+    graph2 = new JobGraph(null);
 
-    JobNode n1 = graph2.getOrCreateNode("1", "1");
-    JobNode n2 = graph2.getOrCreateNode("2", "1");
-    JobNode n3 = graph2.getOrCreateNode("3", "1");
-    JobNode n4 = graph2.getOrCreateNode("4", "1");
-    JobNode n5 = graph2.getOrCreateNode("5", "1");
-    JobNode n6 = graph2.getOrCreateNode("6", "1");
-    JobNode n7 = graph2.getOrCreateNode("7", "1");
+    JobNode n1 = graph2.getOrCreateNode("1", "1", null);
+    JobNode n2 = graph2.getOrCreateNode("2", "1", null);
+    JobNode n3 = graph2.getOrCreateNode("3", "1", null);
+    JobNode n4 = graph2.getOrCreateNode("4", "1", null);
+    JobNode n5 = graph2.getOrCreateNode("5", "1", null);
+    JobNode n6 = graph2.getOrCreateNode("6", "1", null);
+    JobNode n7 = graph2.getOrCreateNode("7", "1", null);
 
     graph2.addSource(genStream(), n1);
     graph2.addIntermediateStream(genStream(), n1, n2);
@@ -117,10 +117,10 @@ public class TestJobGraph {
    * 1<->1 -> 2<->2
    */
   private void createGraph3() {
-    graph3 = new JobGraph(null, null);
+    graph3 = new JobGraph(null);
 
-    JobNode n1 = graph3.getOrCreateNode("1", "1");
-    JobNode n2 = graph3.getOrCreateNode("2", "1");
+    JobNode n1 = graph3.getOrCreateNode("1", "1", null);
+    JobNode n2 = graph3.getOrCreateNode("2", "1", null);
 
     graph3.addSource(genStream(), n1);
     graph3.addIntermediateStream(genStream(), n1, n1);
@@ -133,9 +133,9 @@ public class TestJobGraph {
    * 1<->1
    */
   private void createGraph4() {
-    graph4 = new JobGraph(null, null);
+    graph4 = new JobGraph(null);
 
-    JobNode n1 = graph4.getOrCreateNode("1", "1");
+    JobNode n1 = graph4.getOrCreateNode("1", "1", null);
 
     graph4.addSource(genStream(), n1);
     graph4.addIntermediateStream(genStream(), n1, n1);
@@ -151,7 +151,7 @@ public class TestJobGraph {
 
   @Test
   public void testAddSource() {
-    JobGraph graph = new JobGraph(null, null);
+    JobGraph graph = new JobGraph(null);
 
     /**
      * s1 -> 1
@@ -160,9 +160,9 @@ public class TestJobGraph {
      * s3 -> 2
      *   |-> 3
      */
-    JobNode n1 = graph.getOrCreateNode("1", "1");
-    JobNode n2 = graph.getOrCreateNode("2", "1");
-    JobNode n3 = graph.getOrCreateNode("3", "1");
+    JobNode n1 = graph.getOrCreateNode("1", "1", null);
+    JobNode n2 = graph.getOrCreateNode("2", "1", null);
+    JobNode n3 = graph.getOrCreateNode("3", "1", null);
     StreamSpec s1 = genStream();
     StreamSpec s2 = genStream();
     StreamSpec s3 = genStream();
@@ -173,9 +173,9 @@ public class TestJobGraph {
 
     assertTrue(graph.getSources().size() == 3);
 
-    assertTrue(graph.getOrCreateNode("1", "1").getInEdges().size() == 2);
-    assertTrue(graph.getOrCreateNode("2", "1").getInEdges().size() == 1);
-    assertTrue(graph.getOrCreateNode("3", "1").getInEdges().size() == 1);
+    assertTrue(graph.getOrCreateNode("1", "1", null).getInEdges().size() == 2);
+    assertTrue(graph.getOrCreateNode("2", "1", null).getInEdges().size() == 1);
+    assertTrue(graph.getOrCreateNode("3", "1", null).getInEdges().size() == 1);
 
     assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 0);
     assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 1);
@@ -192,9 +192,9 @@ public class TestJobGraph {
      * 2 -> s2
      * 2 -> s3
      */
-    JobGraph graph = new JobGraph(null, null);
-    JobNode n1 = graph.getOrCreateNode("1", "1");
-    JobNode n2 = graph.getOrCreateNode("2", "1");
+    JobGraph graph = new JobGraph(null);
+    JobNode n1 = graph.getOrCreateNode("1", "1", null);
+    JobNode n2 = graph.getOrCreateNode("2", "1", null);
     StreamSpec s1 = genStream();
     StreamSpec s2 = genStream();
     StreamSpec s3 = genStream();
@@ -203,8 +203,8 @@ public class TestJobGraph {
     graph.addSink(s3, n2);
 
     assertTrue(graph.getSinks().size() == 3);
-    assertTrue(graph.getOrCreateNode("1", "1").getOutEdges().size() == 1);
-    assertTrue(graph.getOrCreateNode("2", "1").getOutEdges().size() == 2);
+    assertTrue(graph.getOrCreateNode("1", "1", null).getOutEdges().size() == 1);
+    assertTrue(graph.getOrCreateNode("2", "1", null).getOutEdges().size() == 2);
 
     assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 1);
     assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 0);
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
new file mode 100644 (file)
index 0000000..9f9945b
--- /dev/null
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Test;
+
+import static org.apache.samza.execution.TestExecutionPlanner.createSystemAdmin;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestJobGraphJsonGenerator {
+
+  @Test
+  public void test() throws Exception {
+
+    /**
+     * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value.
+     *
+     *                               input1 (64) -> map -> join -> output1 (8)
+     *                                                       |
+     *          input2 (16) -> partitionBy ("64") -> filter -|
+     *                                                       |
+     * input3 (32) -> filter -> partitionBy ("64") -> map -> join -> output2 (16)
+     *
+     */
+
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(JobConfig.JOB_NAME(), "test-app");
+    configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system");
+    Config config = new MapConfig(configMap);
+
+    StreamSpec input1 = new StreamSpec("input1", "input1", "system1");
+    StreamSpec input2 = new StreamSpec("input2", "input2", "system2");
+    StreamSpec input3 = new StreamSpec("input3", "input3", "system2");
+
+    StreamSpec output1 = new StreamSpec("output1", "output1", "system1");
+    StreamSpec output2 = new StreamSpec("output2", "output2", "system2");
+
+    ApplicationRunner runner = mock(ApplicationRunner.class);
+    when(runner.getStreamSpec("input1")).thenReturn(input1);
+    when(runner.getStreamSpec("input2")).thenReturn(input2);
+    when(runner.getStreamSpec("input3")).thenReturn(input3);
+    when(runner.getStreamSpec("output1")).thenReturn(output1);
+    when(runner.getStreamSpec("output2")).thenReturn(output2);
+
+    // intermediate streams used in tests
+    when(runner.getStreamSpec("test-app-1-partition_by-0"))
+        .thenReturn(new StreamSpec("test-app-1-partition_by-0", "test-app-1-partition_by-0", "default-system"));
+    when(runner.getStreamSpec("test-app-1-partition_by-1"))
+        .thenReturn(new StreamSpec("test-app-1-partition_by-1", "test-app-1-partition_by-1", "default-system"));
+    when(runner.getStreamSpec("test-app-1-partition_by-4"))
+        .thenReturn(new StreamSpec("test-app-1-partition_by-4", "test-app-1-partition_by-4", "default-system"));
+
+    // set up external partition count
+    Map<String, Integer> system1Map = new HashMap<>();
+    system1Map.put("input1", 64);
+    system1Map.put("output1", 8);
+    Map<String, Integer> system2Map = new HashMap<>();
+    system2Map.put("input2", 16);
+    system2Map.put("input3", 32);
+    system2Map.put("output2", 16);
+
+    Map<String, SystemAdmin> systemAdmins = new HashMap<>();
+    SystemAdmin systemAdmin1 = createSystemAdmin(system1Map);
+    SystemAdmin systemAdmin2 = createSystemAdmin(system2Map);
+    systemAdmins.put("system1", systemAdmin1);
+    systemAdmins.put("system2", systemAdmin2);
+    StreamManager streamManager = new StreamManager(systemAdmins);
+
+    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
+    MessageStream m1 = streamGraph.getInputStream("input1", null).map(m -> m);
+    MessageStream m2 = streamGraph.getInputStream("input2", null).partitionBy(m -> "haha").filter(m -> true);
+    MessageStream m3 = streamGraph.getInputStream("input3", null).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
+    OutputStream<Object, Object, Object> outputStream1 = streamGraph.getOutputStream("output1", null, null);
+    OutputStream<Object, Object, Object> outputStream2 = streamGraph.getOutputStream("output2", null, null);
+
+    m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(outputStream1);
+    m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(outputStream2);
+
+    ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
+    ExecutionPlan plan = planner.plan(streamGraph);
+    String json = plan.getPlanAsJson();
+    System.out.println(json);
+
+    // deserialize
+    ObjectMapper mapper = new ObjectMapper();
+    JobGraphJsonGenerator.JobGraphJson nodes = mapper.readValue(json, JobGraphJsonGenerator.JobGraphJson.class);
+    assertTrue(nodes.jobs.get(0).operatorGraph.inputStreams.size() == 5);
+    assertTrue(nodes.jobs.get(0).operatorGraph.operators.size() == 12);
+    assertTrue(nodes.streams.size() == 7);
+  }
+}