SAMZA-1745: Remove all usages of StreamSpec and ApplicationRunner from the operator...
authorPrateek Maheshwari <pmaheshwari@apache.org>
Fri, 27 Jul 2018 18:24:00 +0000 (11:24 -0700)
committerPrateek Maheshwari <pmaheshwari@apache.org>
Fri, 27 Jul 2018 18:24:00 +0000 (11:24 -0700)
This PR is a pre-requisite for adding support for user-provided SystemDescriptors and StreamDescriptors to the High Level API.

It removes all usages of StreamSpec and ApplicationRunner from the OperatorSpec and OperatorImpl layers. DAG specification (StreamGraphSpec, OperatorSpecs) now only relies on logical streamIds (and in future, will use the user-provided StreamDescriptors). DAG execution (i.e., StreamOperatorTask, OperatorImpls) now only relies on logical streamIds and their corresponding SystemStreams, which are obtained using StreamConfig in OperatorImplGraph.

After this change, StreamSpec can be thought of as the API between StreamManager and SystemAdmins for creating and validating streams. Ideally ExecutionPlanner shouldn't rely on StreamSpec either, but it currently does so extensively, so I'll leave that refactor for later.

Additional changes:
1. ApplicationRunner is no longer responsible for creating/returning StreamSpec instances. Instances can be created directly using the StreamSpec constructors, or by using one of the util methods in the new StreamUtil class.

2. StreamSpec class no longer tracks the isBroadcast and isBounded status for streams.
The former was being used for communicating broadcast status from the StreamGraphSpec to the planner so that it could write the broadcast input configurations. This is now done using a separate Set of broadcast streamIds in StreamGraphSpec.
The latter was being set by the ApplicationRunner based on a config, and then passed to the planner so that it could write the bounded input configs. This was redundant, so I removed it.

Author: Prateek Maheshwari <pmaheshwari@apache.org>
Author: Prateek Maheshwari <prateekm@utexas.edu>
Author: Prateek Maheshwari <pmaheshwari@linkedin.com>
Author: prateekm <prateekm@utexas.edu>

Reviewers: Jagadish Venkatraman <vjagadish1989@gmail.com>, Yi Pan <nickpan47@gmail.com>, Cameron Lee <calee@linkedin.com>

Closes #552 from prateekm/stream-spec-cleanup

57 files changed:
samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
samza-core/src/main/java/org/apache/samza/execution/JobNode.java
samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java
samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java
samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
samza-core/src/main/java/org/apache/samza/util/StreamUtil.java [new file with mode: 0644]
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
samza-core/src/main/scala/org/apache/samza/util/Util.scala
samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java
samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java
samza-core/src/test/java/org/apache/samza/testUtils/StreamTestUtils.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/util/TestStreamUtil.java [moved from samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java with 57% similarity]
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java

index fad9cf8..e2f5d0d 100644 (file)
@@ -30,7 +30,7 @@ import org.apache.samza.annotation.InterfaceStability;
  */
 @InterfaceStability.Unstable
 @FunctionalInterface
-public interface MapFunction<M, OM>  extends InitableFunction, ClosableFunction, Serializable {
+public interface MapFunction<M, OM> extends InitableFunction, ClosableFunction, Serializable {
 
   /**
    * Transforms the provided message into another message.
index 8339429..45abb5d 100644 (file)
@@ -24,7 +24,6 @@ import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.job.ApplicationStatus;
-import org.apache.samza.system.StreamSpec;
 
 import java.lang.reflect.Constructor;
 
@@ -124,25 +123,4 @@ public abstract class ApplicationRunner {
   public boolean waitForFinish(Duration timeout) {
     throw new UnsupportedOperationException(getClass().getName() + " does not support timed waitForFinish.");
   }
-
-  /**
-   * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
-   *
-   * The stream configurations are read from the following properties in the config:
-   * {@code streams.{$streamId}.*}
-   * <br>
-   * All properties matching this pattern are assumed to be system-specific with two exceptions. The following two
-   * properties are Samza properties which are used to bind the stream to a system and a physical resource on that system.
-   *
-   * <ul>
-   *   <li>samza.system -         The name of the System on which this stream will be used. If this property isn't defined
-   *                              the stream will be associated with the System defined in {@code job.default.system}</li>
-   *   <li>samza.physical.name -  The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
-   *                              If this property isn't defined the physical.name will be set to the streamId</li>
-   * </ul>
-   *
-   * @param streamId  The logical identifier for the stream in Samza.
-   * @return          The {@link StreamSpec} instance.
-   */
-  public abstract StreamSpec getStreamSpec(String streamId);
 }
index cd86426..aa71f0e 100644 (file)
@@ -76,24 +76,10 @@ public class StreamSpec implements Serializable {
   private final int partitionCount;
 
   /**
-   * Bounded or unbounded stream
-   */
-  private final boolean isBounded;
-
-  /**
-   * broadcast stream to all tasks
-   */
-  private final boolean isBroadcast;
-
-  /**
    * A set of all system-specific configurations for the stream.
    */
   private final Map<String, String> config;
 
-  @Override
-  public String toString() {
-    return String.format("StreamSpec: id=%s, systemName=%s, pName=%s, partCount=%d.", id, systemName, physicalName, partitionCount);
-  }
   /**
    *  @param id           The application-unique logical identifier for the stream. It is used to distinguish between
    *                      streams in a Samza application so it must be unique in the context of one deployable unit.
@@ -107,7 +93,7 @@ public class StreamSpec implements Serializable {
    *                      Samza System abstraction. See {@link SystemFactory}
    */
   public StreamSpec(String id, String physicalName, String systemName) {
-    this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, false, false, Collections.emptyMap());
+    this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, Collections.emptyMap());
   }
 
   /**
@@ -126,7 +112,7 @@ public class StreamSpec implements Serializable {
    * @param partitionCount  The number of partitionts for the stream. A value of {@code 1} indicates unpartitioned.
    */
   public StreamSpec(String id, String physicalName, String systemName, int partitionCount) {
-    this(id, physicalName, systemName, partitionCount, false, false, Collections.emptyMap());
+    this(id, physicalName, systemName, partitionCount, Collections.emptyMap());
   }
 
   /**
@@ -141,12 +127,10 @@ public class StreamSpec implements Serializable {
    * @param systemName    The System name on which this stream will exist. Corresponds to a named implementation of the
    *                      Samza System abstraction. See {@link SystemFactory}
    *
-   * @param isBounded     The stream is bounded or not.
-   *
    * @param config        A map of properties for the stream. These may be System-specfic.
    */
-  public StreamSpec(String id, String physicalName, String systemName, boolean isBounded, Map<String, String> config) {
-    this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, isBounded, false, config);
+  public StreamSpec(String id, String physicalName, String systemName, Map<String, String> config) {
+    this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, config);
   }
 
   /**
@@ -161,16 +145,11 @@ public class StreamSpec implements Serializable {
    * @param systemName      The System name on which this stream will exist. Corresponds to a named implementation of the
    *                        Samza System abstraction. See {@link SystemFactory}
    *
-   * @param partitionCount  The number of partitionts for the stream. A value of {@code 1} indicates unpartitioned.
-   *
-   * @param isBounded       The stream is bounded or not.
-   *
-   * @param isBroadcast     This stream is broadcast or not.
+   * @param partitionCount  The number of partitions for the stream. A value of {@code 1} indicates unpartitioned.
    *
    * @param config          A map of properties for the stream. These may be System-specfic.
    */
-  public StreamSpec(String id, String physicalName, String systemName, int partitionCount,
-                    boolean isBounded, boolean isBroadcast, Map<String, String> config) {
+  public StreamSpec(String id, String physicalName, String systemName, int partitionCount, Map<String, String> config) {
     validateLogicalIdentifier("streamId", id);
     validateLogicalIdentifier("systemName", systemName);
 
@@ -183,8 +162,6 @@ public class StreamSpec implements Serializable {
     this.systemName = systemName;
     this.physicalName = physicalName;
     this.partitionCount = partitionCount;
-    this.isBounded = isBounded;
-    this.isBroadcast = isBroadcast;
 
     if (config != null) {
       this.config = Collections.unmodifiableMap(new HashMap<>(config));
@@ -202,15 +179,11 @@ public class StreamSpec implements Serializable {
    * @return                A copy of this StreamSpec with the specified partitionCount.
    */
   public StreamSpec copyWithPartitionCount(int partitionCount) {
-    return new StreamSpec(id, physicalName, systemName, partitionCount, this.isBounded, this.isBroadcast, config);
+    return new StreamSpec(id, physicalName, systemName, partitionCount, config);
   }
 
   public StreamSpec copyWithPhysicalName(String physicalName) {
-    return new StreamSpec(id, physicalName, systemName, partitionCount, this.isBounded, this.isBroadcast, config);
-  }
-
-  public StreamSpec copyWithBroadCast() {
-    return new StreamSpec(id, physicalName, systemName, partitionCount, this.isBounded, true, config);
+    return new StreamSpec(id, physicalName, systemName, partitionCount, config);
   }
 
   public String getId() {
@@ -253,14 +226,6 @@ public class StreamSpec implements Serializable {
     return id.equals(COORDINATOR_STREAM_ID);
   }
 
-  public boolean isBounded() {
-    return isBounded;
-  }
-
-  public boolean isBroadcast() {
-    return isBroadcast;
-  }
-
   private void validateLogicalIdentifier(String identifierName, String identifierValue) {
     if (identifierValue == null || !identifierValue.matches("[A-Za-z0-9_-]+")) {
       throw new IllegalArgumentException(String.format("Identifier '%s' is '%s'. It must match the expression [A-Za-z0-9_-]+", identifierName, identifierValue));
@@ -297,4 +262,9 @@ public class StreamSpec implements Serializable {
   public static StreamSpec createStreamAppenderStreamSpec(String physicalName, String systemName, int partitionCount) {
     return new StreamSpec(STREAM_APPENDER_ID, physicalName, systemName, partitionCount);
   }
+
+  @Override
+  public String toString() {
+    return String.format("StreamSpec: id=%s, systemName=%s, pName=%s, partCount=%d.", id, systemName, physicalName, partitionCount);
+  }
 }
index 29dd3ef..c5b2183 100644 (file)
@@ -32,6 +32,7 @@ import org.apache.samza.checkpoint.CheckpointManagerFactory;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.StreamUtil;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,7 +103,7 @@ public class TaskConfigJava extends MapConfig {
       } else {
         String systemStreamName = systemStreamPartition.substring(0, hashPosition);
         String partitionSegment = systemStreamPartition.substring(hashPosition + 1);
-        SystemStream systemStream = Util.getSystemStreamFromNames(systemStreamName);
+        SystemStream systemStream = StreamUtil.getSystemStreamFromNames(systemStreamName);
 
         if (Pattern.matches(BROADCAST_STREAM_PATTERN, partitionSegment)) {
           systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(Integer.valueOf(partitionSegment))));
index 48f939c..ef52e90 100644 (file)
@@ -34,6 +34,7 @@ import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StreamConfig;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
@@ -43,6 +44,8 @@ import org.apache.samza.table.TableSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.samza.util.StreamUtil.*;
+
 
 /**
  * The ExecutionPlanner creates the physical execution graph for the StreamGraph, and
@@ -93,8 +96,9 @@ public class ExecutionPlanner {
    */
   /* package private */ JobGraph createJobGraph(OperatorSpecGraph specGraph) {
     JobGraph jobGraph = new JobGraph(config, specGraph);
-    Set<StreamSpec> sourceStreams = new HashSet<>(specGraph.getInputOperators().keySet());
-    Set<StreamSpec> sinkStreams = new HashSet<>(specGraph.getOutputStreams().keySet());
+    StreamConfig streamConfig = new StreamConfig(config);
+    Set<StreamSpec> sourceStreams = getStreamSpecs(specGraph.getInputOperators().keySet(), streamConfig);
+    Set<StreamSpec> sinkStreams = getStreamSpecs(specGraph.getOutputStreams().keySet(), streamConfig);
     Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
     Set<TableSpec> tables = new HashSet<>(specGraph.getTables().keySet());
     intStreams.retainAll(sinkStreams);
@@ -128,7 +132,7 @@ public class ExecutionPlanner {
    */
   /* package private */ void calculatePartitions(JobGraph jobGraph) {
     // calculate the partitions for the input streams of join operators
-    calculateJoinInputPartitions(jobGraph);
+    calculateJoinInputPartitions(jobGraph, config);
 
     // calculate the partitions for the rest of intermediate streams
     calculateIntStreamPartitions(jobGraph, config);
@@ -172,7 +176,7 @@ public class ExecutionPlanner {
   /**
    * Calculate the partitions for the input streams of join operators
    */
-  /* package private */ static void calculateJoinInputPartitions(JobGraph jobGraph) {
+  /* package private */ static void calculateJoinInputPartitions(JobGraph jobGraph, Config config) {
     // mapping from a source stream to all join specs reachable from it
     Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges = HashMultimap.create();
     // reverse mapping of the above
@@ -183,10 +187,10 @@ public class ExecutionPlanner {
     Set<OperatorSpec> visited = new HashSet<>();
 
     jobGraph.getSpecGraph().getInputOperators().entrySet().forEach(entry -> {
-        StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(entry.getKey());
+        StreamConfig streamConfig = new StreamConfig(config);
+        StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(getStreamSpec(entry.getKey(), streamConfig));
         // Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
-        findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs,
-            joinQ, visited);
+        findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs, joinQ, visited);
       });
 
     // At this point, joinQ contains joinSpecs where at least one of the input stream edge partitions is known.
@@ -203,7 +207,7 @@ public class ExecutionPlanner {
           } else if (partitions != edgePartitions) {
             throw  new SamzaException(String.format(
                 "Unable to resolve input partitions of stream %s for join. Expected: %d, Actual: %d",
-                edge.getFormattedSystemStream(), partitions, edgePartitions));
+                edge.getName(), partitions, edgePartitions));
           }
         }
       }
@@ -282,7 +286,7 @@ public class ExecutionPlanner {
   private static void validatePartitions(JobGraph jobGraph) {
     for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
       if (edge.getPartitionCount() <= 0) {
-        throw new SamzaException(String.format("Failure to assign the partitions to Stream %s", edge.getFormattedSystemStream()));
+        throw new SamzaException(String.format("Failure to assign the partitions to Stream %s", edge.getName()));
       }
     }
   }
index 843db85..2f210f2 100644 (file)
@@ -87,7 +87,7 @@ import org.slf4j.LoggerFactory;
   @Override
   public List<StreamSpec> getIntermediateStreams() {
     return getIntermediateStreamEdges().stream()
-        .map(streamEdge -> streamEdge.getStreamSpec())
+        .map(StreamEdge::getStreamSpec)
         .collect(Collectors.toList());
   }
 
@@ -187,12 +187,10 @@ import org.slf4j.LoggerFactory;
     String streamId = streamSpec.getId();
     StreamEdge edge = edges.get(streamId);
     if (edge == null) {
-      edge = new StreamEdge(streamSpec, isIntermediate, config);
+      boolean isBroadcast = specGraph.getBroadcastStreams().contains(streamId);
+      edge = new StreamEdge(streamSpec, isIntermediate, isBroadcast, config);
       edges.put(streamId, edge);
     }
-    if (streamSpec.isBroadcast()) {
-      edge.setPartitionCount(1);
-    }
     return edge;
   }
 
@@ -262,11 +260,11 @@ import org.slf4j.LoggerFactory;
     sources.forEach(edge -> {
         if (!edge.getSourceNodes().isEmpty()) {
           throw new IllegalArgumentException(
-              String.format("Source stream %s should not have producers.", edge.getFormattedSystemStream()));
+              String.format("Source stream %s should not have producers.", edge.getName()));
         }
         if (edge.getTargetNodes().isEmpty()) {
           throw new IllegalArgumentException(
-              String.format("Source stream %s should have consumers.", edge.getFormattedSystemStream()));
+              String.format("Source stream %s should have consumers.", edge.getName()));
         }
       });
   }
@@ -278,11 +276,11 @@ import org.slf4j.LoggerFactory;
     sinks.forEach(edge -> {
         if (!edge.getTargetNodes().isEmpty()) {
           throw new IllegalArgumentException(
-              String.format("Sink stream %s should not have consumers", edge.getFormattedSystemStream()));
+              String.format("Sink stream %s should not have consumers", edge.getName()));
         }
         if (edge.getSourceNodes().isEmpty()) {
           throw new IllegalArgumentException(
-              String.format("Sink stream %s should have producers", edge.getFormattedSystemStream()));
+              String.format("Sink stream %s should have producers", edge.getName()));
         }
       });
   }
@@ -298,7 +296,7 @@ import org.slf4j.LoggerFactory;
     internalEdges.forEach(edge -> {
         if (edge.getSourceNodes().isEmpty() || edge.getTargetNodes().isEmpty()) {
           throw new IllegalArgumentException(
-              String.format("Internal stream %s should have both producers and consumers", edge.getFormattedSystemStream()));
+              String.format("Internal stream %s should have both producers and consumers", edge.getName()));
         }
       });
   }
index 298042b..4f2aa23 100644 (file)
@@ -170,10 +170,10 @@ import org.codehaus.jackson.map.ObjectMapper;
   private OperatorGraphJson buildOperatorGraphJson(JobNode jobNode) {
     OperatorGraphJson opGraph = new OperatorGraphJson();
     opGraph.inputStreams = new ArrayList<>();
-    jobNode.getSpecGraph().getInputOperators().forEach((streamSpec, operatorSpec) -> {
+    jobNode.getSpecGraph().getInputOperators().forEach((streamId, operatorSpec) -> {
         StreamJson inputJson = new StreamJson();
         opGraph.inputStreams.add(inputJson);
-        inputJson.streamId = streamSpec.getId();
+        inputJson.streamId = streamId;
         Collection<OperatorSpec> specs = operatorSpec.getRegisteredOperatorSpecs();
         inputJson.nextOperatorIds = specs.stream().map(OperatorSpec::getOpId).collect(Collectors.toSet());
 
@@ -181,9 +181,9 @@ import org.codehaus.jackson.map.ObjectMapper;
       });
 
     opGraph.outputStreams = new ArrayList<>();
-    jobNode.getSpecGraph().getOutputStreams().keySet().forEach(streamSpec -> {
+    jobNode.getSpecGraph().getOutputStreams().keySet().forEach(streamId -> {
         StreamJson outputJson = new StreamJson();
-        outputJson.streamId = streamSpec.getId();
+        outputJson.streamId = streamId;
         opGraph.outputStreams.add(outputJson);
       });
     return opGraph;
@@ -219,10 +219,10 @@ import org.codehaus.jackson.map.ObjectMapper;
 
     if (spec instanceof OutputOperatorSpec) {
       OutputStreamImpl outputStream = ((OutputOperatorSpec) spec).getOutputStream();
-      map.put("outputStreamId", outputStream.getStreamSpec().getId());
+      map.put("outputStreamId", outputStream.getStreamId());
     } else if (spec instanceof PartitionByOperatorSpec) {
       OutputStreamImpl outputStream = ((PartitionByOperatorSpec) spec).getOutputStream();
-      map.put("outputStreamId", outputStream.getStreamSpec().getId());
+      map.put("outputStreamId", outputStream.getStreamId());
     }
 
     if (spec instanceof StreamTableJoinOperatorSpec) {
index 288b1a1..dba47e1 100644 (file)
@@ -49,8 +49,8 @@ import org.apache.samza.table.TableConfigGenerator;
 import org.apache.samza.util.MathUtil;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerializableSerde;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.table.TableSpec;
+import org.apache.samza.util.StreamUtil;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -133,8 +133,8 @@ public class JobNode {
     final List<String> inputs = new ArrayList<>();
     final List<String> broadcasts = new ArrayList<>();
     for (StreamEdge inEdge : inEdges) {
-      String formattedSystemStream = inEdge.getFormattedSystemStream();
-      if (inEdge.getStreamSpec().isBroadcast()) {
+      String formattedSystemStream = inEdge.getName();
+      if (inEdge.isBroadcast()) {
         broadcasts.add(formattedSystemStream + "#0");
       } else {
         inputs.add(formattedSystemStream);
@@ -184,9 +184,9 @@ public class JobNode {
         List<String> sideInputs = tableSpec.getSideInputs();
         if (sideInputs != null && !sideInputs.isEmpty()) {
           sideInputs.stream()
-              .map(sideInput -> Util.getSystemStreamFromNameOrId(config, sideInput))
+              .map(sideInput -> StreamUtil.getSystemStreamFromNameOrId(config, sideInput))
               .forEach(systemStream -> {
-                  inputs.add(Util.getNameFromSystemStream(systemStream));
+                  inputs.add(StreamUtil.getNameFromSystemStream(systemStream));
                   configs.put(String.format(StreamConfig.STREAM_PREFIX() + StreamConfig.BOOTSTRAP(),
                       systemStream.getSystem(), systemStream.getStream()), "true");
                 });
@@ -230,17 +230,17 @@ public class JobNode {
     // collect all key and msg serde instances for streams
     Map<String, Serde> streamKeySerdes = new HashMap<>();
     Map<String, Serde> streamMsgSerdes = new HashMap<>();
-    Map<StreamSpec, InputOperatorSpec> inputOperators = specGraph.getInputOperators();
+    Map<String, InputOperatorSpec> inputOperators = specGraph.getInputOperators();
     inEdges.forEach(edge -> {
         String streamId = edge.getStreamSpec().getId();
-        InputOperatorSpec inputOperatorSpec = inputOperators.get(edge.getStreamSpec());
+        InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
         streamKeySerdes.put(streamId, inputOperatorSpec.getKeySerde());
         streamMsgSerdes.put(streamId, inputOperatorSpec.getValueSerde());
       });
-    Map<StreamSpec, OutputStreamImpl> outputStreams = specGraph.getOutputStreams();
+    Map<String, OutputStreamImpl> outputStreams = specGraph.getOutputStreams();
     outEdges.forEach(edge -> {
         String streamId = edge.getStreamSpec().getId();
-        OutputStreamImpl outputStream = outputStreams.get(edge.getStreamSpec());
+        OutputStreamImpl outputStream = outputStreams.get(streamId);
         streamKeySerdes.put(streamId, outputStream.getKeySerde());
         streamMsgSerdes.put(streamId, outputStream.getValueSerde());
       });
index b4c93d9..f2f0310 100644 (file)
@@ -29,7 +29,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.Util;
+import org.apache.samza.util.StreamUtil;
 
 
 /**
@@ -41,23 +41,24 @@ public class StreamEdge {
   public static final int PARTITIONS_UNKNOWN = -1;
 
   private final StreamSpec streamSpec;
+  private final boolean isBroadcast;
+  private final boolean isIntermediate;
   private final List<JobNode> sourceNodes = new ArrayList<>();
   private final List<JobNode> targetNodes = new ArrayList<>();
   private final Config config;
+  private final String name;
 
-  private String name = "";
   private int partitions = PARTITIONS_UNKNOWN;
-  private final boolean isIntermediate;
-
-  StreamEdge(StreamSpec streamSpec, Config config) {
-    this(streamSpec, false, config);
-  }
 
-  StreamEdge(StreamSpec streamSpec, boolean isIntermediate, Config config) {
+  StreamEdge(StreamSpec streamSpec, boolean isIntermediate, boolean isBroadcast, Config config) {
     this.streamSpec = streamSpec;
-    this.name = Util.getNameFromSystemStream(getSystemStream());
     this.isIntermediate = isIntermediate;
+    this.isBroadcast = isBroadcast;
     this.config = config;
+    if (isBroadcast) {
+      partitions = 1;
+    }
+    this.name = StreamUtil.getNameFromSystemStream(getSystemStream());
   }
 
   void addSourceNode(JobNode sourceNode) {
@@ -85,10 +86,6 @@ public class StreamEdge {
     return getStreamSpec().toSystemStream();
   }
 
-  String getFormattedSystemStream() {
-    return Util.getNameFromSystemStream(getSystemStream());
-  }
-
   List<JobNode> getSourceNodes() {
     return sourceNodes;
   }
@@ -109,10 +106,6 @@ public class StreamEdge {
     return name;
   }
 
-  void setName(String name) {
-    this.name = name;
-  }
-
   boolean isIntermediate() {
     return isIntermediate;
   }
@@ -128,12 +121,13 @@ public class StreamEdge {
       config.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), spec.getId()), "oldest");
       config.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), spec.getId()), String.valueOf(Integer.MAX_VALUE));
     }
-    if (spec.isBounded()) {
-      config.put(String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID(), spec.getId()), "true");
-    }
     spec.getConfig().forEach((property, value) -> {
         config.put(String.format(StreamConfig.STREAM_ID_PREFIX(), spec.getId()) + property, value);
       });
     return new MapConfig(config);
   }
+
+  public boolean isBroadcast() {
+    return isBroadcast;
+  }
 }
index 7f60f96..2921f3b 100644 (file)
@@ -38,6 +38,7 @@ import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.util.StreamUtil;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -142,7 +143,7 @@ public class StreamManager {
             .getOrElse(defaultValue(null));
         if (changelog != null) {
           LOGGER.info("Clear store {} changelog {}", store, changelog);
-          SystemStream systemStream = Util.getSystemStreamFromNames(changelog);
+          SystemStream systemStream = StreamUtil.getSystemStreamFromNames(changelog);
           StreamSpec spec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream(), systemStream.getSystem(), 1);
           systemAdmins.getSystemAdmin(spec.getSystemName()).clearStream(spec);
         }
index ba51c7c..b6c3dae 100644 (file)
@@ -29,7 +29,6 @@ import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.serializers.SerializableSerde;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.table.TableSpec;
 
 
@@ -41,8 +40,9 @@ import org.apache.samza.table.TableSpec;
  */
 public class OperatorSpecGraph implements Serializable {
   // We use a LHM for deterministic order in initializing and closing operators.
-  private final Map<StreamSpec, InputOperatorSpec> inputOperators;
-  private final Map<StreamSpec, OutputStreamImpl> outputStreams;
+  private final Map<String, InputOperatorSpec> inputOperators;
+  private final Map<String, OutputStreamImpl> outputStreams;
+  private final Set<String> broadcastStreams;
   private final Map<TableSpec, TableImpl> tables;
   private final Set<OperatorSpec> allOpSpecs;
   private final boolean hasWindowOrJoins;
@@ -54,20 +54,25 @@ public class OperatorSpecGraph implements Serializable {
   OperatorSpecGraph(StreamGraphSpec graphSpec) {
     this.inputOperators = graphSpec.getInputOperators();
     this.outputStreams = graphSpec.getOutputStreams();
+    this.broadcastStreams = graphSpec.getBroadcastStreams();
     this.tables = graphSpec.getTables();
     this.allOpSpecs = Collections.unmodifiableSet(this.findAllOperatorSpecs());
     this.hasWindowOrJoins = checkWindowOrJoins();
     this.serializedOpSpecGraph = opSpecGraphSerde.toBytes(this);
   }
 
-  public Map<StreamSpec, InputOperatorSpec> getInputOperators() {
+  public Map<String, InputOperatorSpec> getInputOperators() {
     return inputOperators;
   }
 
-  public Map<StreamSpec, OutputStreamImpl> getOutputStreams() {
+  public Map<String, OutputStreamImpl> getOutputStreams() {
     return outputStreams;
   }
 
+  public Set<String> getBroadcastStreams() {
+    return broadcastStreams;
+  }
+
   public Map<TableSpec, TableImpl> getTables() {
     return tables;
   }
index ea9690b..a187b94 100644 (file)
@@ -34,11 +34,9 @@ import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.operators.spec.OperatorSpecs;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.TableSpec;
 import org.slf4j.Logger;
@@ -55,13 +53,13 @@ import com.google.common.base.Preconditions;
  */
 public class StreamGraphSpec implements StreamGraph {
   private static final Logger LOGGER = LoggerFactory.getLogger(StreamGraphSpec.class);
-  private static final Pattern USER_DEFINED_ID_PATTERN = Pattern.compile("[\\d\\w-_.]+");
+  private static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_.]+");
 
   // We use a LHM for deterministic order in initializing and closing operators.
-  private final Map<StreamSpec, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
-  private final Map<StreamSpec, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
+  private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
+  private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
+  private final Set<String> broadcastStreams = new HashSet<>();
   private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>();
-  private final ApplicationRunner runner;
   private final Config config;
 
   /**
@@ -74,10 +72,7 @@ public class StreamGraphSpec implements StreamGraph {
   private Serde<?> defaultSerde = new KVSerde(new NoOpSerde(), new NoOpSerde());
   private ContextManager contextManager = null;
 
-  public StreamGraphSpec(ApplicationRunner runner, Config config) {
-    // TODO: SAMZA-1118 - Move StreamSpec and ApplicationRunner out of StreamGraphSpec once Systems
-    // can use streamId to send and receive messages.
-    this.runner = runner;
+  public StreamGraphSpec(Config config) {
     this.config = config;
   }
 
@@ -91,15 +86,15 @@ public class StreamGraphSpec implements StreamGraph {
 
   @Override
   public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) {
-    StreamSpec streamSpec = runner.getStreamSpec(streamId);
-    Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + streamId);
+    Preconditions.checkState(isValidId(streamId),
+        "streamId must be non-empty and must not contain spaces or special characters: " + streamId);
     Preconditions.checkNotNull(serde, "serde must not be null for an input stream.");
-    Preconditions.checkState(!inputOperators.containsKey(streamSpec),
+    Preconditions.checkState(!inputOperators.containsKey(streamId),
         "getInputStream must not be called multiple times with the same streamId: " + streamId);
 
     KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
-    if (outputStreams.containsKey(streamSpec)) {
-      OutputStreamImpl outputStream = outputStreams.get(streamSpec);
+    if (outputStreams.containsKey(streamId)) {
+      OutputStreamImpl outputStream = outputStreams.get(streamId);
       Serde keySerde = outputStream.getKeySerde();
       Serde valueSerde = outputStream.getValueSerde();
       Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
@@ -109,10 +104,10 @@ public class StreamGraphSpec implements StreamGraph {
 
     boolean isKeyed = serde instanceof KVSerde;
     InputOperatorSpec inputOperatorSpec =
-        OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(),
+        OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
             isKeyed, this.getNextOpId(OpCode.INPUT, null));
-    inputOperators.put(streamSpec, inputOperatorSpec);
-    return new MessageStreamImpl<>(this, inputOperators.get(streamSpec));
+    inputOperators.put(streamId, inputOperatorSpec);
+    return new MessageStreamImpl<>(this, inputOperators.get(streamId));
   }
 
   @Override
@@ -122,15 +117,15 @@ public class StreamGraphSpec implements StreamGraph {
 
   @Override
   public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) {
-    StreamSpec streamSpec = runner.getStreamSpec(streamId);
-    Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + streamId);
+    Preconditions.checkState(isValidId(streamId),
+        "streamId must be non-empty and must not contain spaces or special characters: " + streamId);
     Preconditions.checkNotNull(serde, "serde must not be null for an output stream.");
-    Preconditions.checkState(!outputStreams.containsKey(streamSpec),
+    Preconditions.checkState(!outputStreams.containsKey(streamId),
         "getOutputStream must not be called multiple times with the same streamId: " + streamId);
 
     KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
-    if (inputOperators.containsKey(streamSpec)) {
-      InputOperatorSpec inputOperatorSpec = inputOperators.get(streamSpec);
+    if (inputOperators.containsKey(streamId)) {
+      InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
       Serde keySerde = inputOperatorSpec.getKeySerde();
       Serde valueSerde = inputOperatorSpec.getValueSerde();
       Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
@@ -139,8 +134,8 @@ public class StreamGraphSpec implements StreamGraph {
     }
 
     boolean isKeyed = serde instanceof KVSerde;
-    outputStreams.put(streamSpec, new OutputStreamImpl<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
-    return outputStreams.get(streamSpec);
+    outputStreams.put(streamId, new OutputStreamImpl<>(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
+    return outputStreams.get(streamId);
   }
 
   @Override
@@ -183,8 +178,8 @@ public class StreamGraphSpec implements StreamGraph {
    * @return the unique ID for the next operator in the graph
    */
   public String getNextOpId(OpCode opCode, String userDefinedId) {
-    if (StringUtils.isNotBlank(userDefinedId) && !USER_DEFINED_ID_PATTERN.matcher(userDefinedId).matches()) {
-      throw new SamzaException("Operator ID must not contain spaces and special characters: " + userDefinedId);
+    if (StringUtils.isNotBlank(userDefinedId) && !ID_PATTERN.matcher(userDefinedId).matches()) {
+      throw new SamzaException("Operator ID must not contain spaces or special characters: " + userDefinedId);
     }
 
     String nextOpId = String.format("%s-%s-%s-%s",
@@ -234,17 +229,10 @@ public class StreamGraphSpec implements StreamGraph {
    * @param isBroadcast whether the stream is a broadcast stream.
    * @param <M> the type of messages in the intermediate {@link MessageStream}
    * @return  the intermediate {@link MessageStreamImpl}
-   *
-   * TODO: once SAMZA-1566 is resolved, we should be able to pass in the StreamSpec directly.
    */
   @VisibleForTesting
   <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde, boolean isBroadcast) {
-    StreamSpec streamSpec = runner.getStreamSpec(streamId);
-    if (isBroadcast) {
-      streamSpec = streamSpec.copyWithBroadCast();
-    }
-
-    Preconditions.checkState(!inputOperators.containsKey(streamSpec) && !outputStreams.containsKey(streamSpec),
+    Preconditions.checkState(!inputOperators.containsKey(streamId) && !outputStreams.containsKey(streamId),
         "getIntermediateStream must not be called multiple times with the same streamId: " + streamId);
 
     if (serde == null) {
@@ -252,28 +240,37 @@ public class StreamGraphSpec implements StreamGraph {
       serde = (Serde<M>) defaultSerde;
     }
 
+    if (isBroadcast) broadcastStreams.add(streamId);
     boolean isKeyed = serde instanceof KVSerde;
     KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
     InputOperatorSpec inputOperatorSpec =
-        OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(),
+        OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
             isKeyed, this.getNextOpId(OpCode.INPUT, null));
-    inputOperators.put(streamSpec, inputOperatorSpec);
-    outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
-    return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamSpec), outputStreams.get(streamSpec));
+    inputOperators.put(streamId, inputOperatorSpec);
+    outputStreams.put(streamId, new OutputStreamImpl(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
+    return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId));
   }
 
-  Map<StreamSpec, InputOperatorSpec> getInputOperators() {
+  Map<String, InputOperatorSpec> getInputOperators() {
     return Collections.unmodifiableMap(inputOperators);
   }
 
-  Map<StreamSpec, OutputStreamImpl> getOutputStreams() {
+  Map<String, OutputStreamImpl> getOutputStreams() {
     return Collections.unmodifiableMap(outputStreams);
   }
 
+  Set<String> getBroadcastStreams() {
+    return Collections.unmodifiableSet(broadcastStreams);
+  }
+
   Map<TableSpec, TableImpl> getTables() {
     return Collections.unmodifiableMap(tables);
   }
 
+  private boolean isValidId(String id) {
+    return StringUtils.isNotBlank(id) && ID_PATTERN.matcher(id).matches();
+  }
+
   private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) {
     Serde keySerde, valueSerde;
 
index 8df670e..99ed089 100644 (file)
@@ -40,9 +40,9 @@ class BroadcastOperatorImpl<M> extends OperatorImpl<M, Void> {
   private final SystemStream systemStream;
   private final String taskName;
 
-  BroadcastOperatorImpl(BroadcastOperatorSpec<M> broadcastOpSpec, TaskContext context) {
+  BroadcastOperatorImpl(BroadcastOperatorSpec<M> broadcastOpSpec, SystemStream systemStream, TaskContext context) {
     this.broadcastOpSpec = broadcastOpSpec;
-    this.systemStream = broadcastOpSpec.getOutputStream().getSystemStream();
+    this.systemStream = systemStream;
     this.taskName = context.getTaskName().getTaskName();
   }
 
index df73e48..7f62e00 100644 (file)
@@ -22,6 +22,7 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.StreamConfig;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.operators.KV;
@@ -96,12 +97,14 @@ public class OperatorImplGraph {
    */
   public OperatorImplGraph(OperatorSpecGraph specGraph, Config config, TaskContext context, Clock clock) {
     this.clock = clock;
-
+    StreamConfig streamConfig = new StreamConfig(config);
     TaskContextImpl taskContext = (TaskContextImpl) context;
-    Map<SystemStream, Integer> producerTaskCounts = hasIntermediateStreams(specGraph) ?
-        getProducerTaskCountForIntermediateStreams(getStreamToConsumerTasks(taskContext.getJobModel()),
-            getIntermediateToInputStreamsMap(specGraph)) :
-        Collections.EMPTY_MAP;
+    Map<SystemStream, Integer> producerTaskCounts =
+        hasIntermediateStreams(specGraph)
+            ? getProducerTaskCountForIntermediateStreams(
+                getStreamToConsumerTasks(taskContext.getJobModel()),
+                getIntermediateToInputStreamsMap(specGraph, streamConfig))
+            : Collections.EMPTY_MAP;
     producerTaskCounts.forEach((stream, count) -> {
         LOG.info("{} has {} producer tasks.", stream, count);
       });
@@ -113,8 +116,8 @@ public class OperatorImplGraph {
     taskContext.registerObject(WatermarkStates.class.getName(),
         new WatermarkStates(context.getSystemStreamPartitions(), producerTaskCounts));
 
-    specGraph.getInputOperators().forEach((streamSpec, inputOpSpec) -> {
-        SystemStream systemStream = new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
+    specGraph.getInputOperators().forEach((streamId, inputOpSpec) -> {
+        SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId);
         InputOperatorImpl inputOperatorImpl =
             (InputOperatorImpl) createAndRegisterOperatorImpl(null, inputOpSpec, systemStream, config, context);
         this.inputOperators.put(systemStream, inputOperatorImpl);
@@ -210,6 +213,7 @@ public class OperatorImplGraph {
    */
   OperatorImpl createOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec,
       Config config, TaskContext context) {
+    StreamConfig streamConfig = new StreamConfig(config);
     if (operatorSpec instanceof InputOperatorSpec) {
       return new InputOperatorImpl((InputOperatorSpec) operatorSpec);
     } else if (operatorSpec instanceof StreamOperatorSpec) {
@@ -217,9 +221,13 @@ public class OperatorImplGraph {
     } else if (operatorSpec instanceof SinkOperatorSpec) {
       return new SinkOperatorImpl((SinkOperatorSpec) operatorSpec, config, context);
     } else if (operatorSpec instanceof OutputOperatorSpec) {
-      return new OutputOperatorImpl((OutputOperatorSpec) operatorSpec);
+      String streamId = ((OutputOperatorSpec) operatorSpec).getOutputStream().getStreamId();
+      SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId);
+      return new OutputOperatorImpl((OutputOperatorSpec) operatorSpec, systemStream);
     } else if (operatorSpec instanceof PartitionByOperatorSpec) {
-      return new PartitionByOperatorImpl((PartitionByOperatorSpec) operatorSpec, config, context);
+      String streamId = ((PartitionByOperatorSpec) operatorSpec).getOutputStream().getStreamId();
+      SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId);
+      return new PartitionByOperatorImpl((PartitionByOperatorSpec) operatorSpec, systemStream, context);
     } else if (operatorSpec instanceof WindowOperatorSpec) {
       return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, clock);
     } else if (operatorSpec instanceof JoinOperatorSpec) {
@@ -231,7 +239,9 @@ public class OperatorImplGraph {
     } else if (operatorSpec instanceof SendToTableOperatorSpec) {
       return new SendToTableOperatorImpl((SendToTableOperatorSpec) operatorSpec, config, context);
     } else if (operatorSpec instanceof BroadcastOperatorSpec) {
-      return new BroadcastOperatorImpl((BroadcastOperatorSpec) operatorSpec, context);
+      String streamId = ((BroadcastOperatorSpec) operatorSpec).getOutputStream().getStreamId();
+      SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId);
+      return new BroadcastOperatorImpl((BroadcastOperatorSpec) operatorSpec, systemStream, context);
     }
     throw new IllegalArgumentException(
         String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName()));
@@ -323,10 +333,6 @@ public class OperatorImplGraph {
     };
   }
 
-  private boolean hasIntermediateStreams(OperatorSpecGraph specGraph) {
-    return !Collections.disjoint(specGraph.getInputOperators().keySet(), specGraph.getOutputStreams().keySet());
-  }
-
   /**
    * calculate the task count that produces to each intermediate streams
    * @param streamToConsumerTasks input streams to task mapping
@@ -337,12 +343,11 @@ public class OperatorImplGraph {
       Multimap<SystemStream, String> streamToConsumerTasks,
       Multimap<SystemStream, SystemStream> intermediateToInputStreams) {
     Map<SystemStream, Integer> result = new HashMap<>();
-    intermediateToInputStreams.asMap().entrySet().forEach(entry -> {
+    intermediateToInputStreams.asMap().entrySet().forEach(entry ->
         result.put(entry.getKey(),
             entry.getValue().stream()
                 .flatMap(systemStream -> streamToConsumerTasks.get(systemStream).stream())
-                .collect(Collectors.toSet()).size());
-      });
+                .collect(Collectors.toSet()).size()));
     return result;
   }
 
@@ -368,25 +373,34 @@ public class OperatorImplGraph {
    * @param specGraph the user {@link OperatorSpecGraph}
    * @return mapping from output streams to input streams
    */
-  static Multimap<SystemStream, SystemStream> getIntermediateToInputStreamsMap(OperatorSpecGraph specGraph) {
+  static Multimap<SystemStream, SystemStream> getIntermediateToInputStreamsMap(
+      OperatorSpecGraph specGraph, StreamConfig streamConfig) {
     Multimap<SystemStream, SystemStream> outputToInputStreams = HashMultimap.create();
     specGraph.getInputOperators().entrySet().stream()
-        .forEach(
-            entry -> computeOutputToInput(entry.getKey().toSystemStream(), entry.getValue(), outputToInputStreams));
+        .forEach(entry -> {
+            SystemStream systemStream = streamConfig.streamIdToSystemStream(entry.getKey());
+            computeOutputToInput(systemStream, entry.getValue(), outputToInputStreams, streamConfig);
+          });
     return outputToInputStreams;
   }
 
   private static void computeOutputToInput(SystemStream input, OperatorSpec opSpec,
-      Multimap<SystemStream, SystemStream> outputToInputStreams) {
+      Multimap<SystemStream, SystemStream> outputToInputStreams, StreamConfig streamConfig) {
     if (opSpec instanceof PartitionByOperatorSpec) {
       PartitionByOperatorSpec spec = (PartitionByOperatorSpec) opSpec;
-      outputToInputStreams.put(spec.getOutputStream().getStreamSpec().toSystemStream(), input);
+      SystemStream systemStream = streamConfig.streamIdToSystemStream(spec.getOutputStream().getStreamId());
+      outputToInputStreams.put(systemStream, input);
     } else if (opSpec instanceof BroadcastOperatorSpec) {
       BroadcastOperatorSpec spec = (BroadcastOperatorSpec) opSpec;
-      outputToInputStreams.put(spec.getOutputStream().getStreamSpec().toSystemStream(), input);
+      SystemStream systemStream = streamConfig.streamIdToSystemStream(spec.getOutputStream().getStreamId());
+      outputToInputStreams.put(systemStream, input);
     } else {
       Collection<OperatorSpec> nextOperators = opSpec.getRegisteredOperatorSpecs();
-      nextOperators.forEach(spec -> computeOutputToInput(input, spec, outputToInputStreams));
+      nextOperators.forEach(spec -> computeOutputToInput(input, spec, outputToInputStreams, streamConfig));
     }
   }
+
+  private boolean hasIntermediateStreams(OperatorSpecGraph specGraph) {
+    return !Collections.disjoint(specGraph.getInputOperators().keySet(), specGraph.getOutputStreams().keySet());
+  }
 }
index e625484..22fbb1b 100644 (file)
@@ -42,10 +42,10 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
   private final OutputStreamImpl<M> outputStream;
   private final SystemStream systemStream;
 
-  OutputOperatorImpl(OutputOperatorSpec<M> outputOpSpec) {
+  OutputOperatorImpl(OutputOperatorSpec<M> outputOpSpec, SystemStream systemStream) {
     this.outputOpSpec = outputOpSpec;
     this.outputStream = outputOpSpec.getOutputStream();
-    this.systemStream = outputStream.getSystemStream();
+    this.systemStream = systemStream;
   }
 
   @Override
index dd64429..63e269d 100644 (file)
@@ -20,10 +20,8 @@ package org.apache.samza.operators.impl;
 
 import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskContextImpl;
-import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.spec.PartitionByOperatorSpec;
 import org.apache.samza.system.ControlMessage;
 import org.apache.samza.system.EndOfStreamMessage;
@@ -51,10 +49,10 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> {
   private final String taskName;
   private final ControlMessageSender controlMessageSender;
 
-  PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOpSpec, Config config, TaskContext context) {
+  PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOpSpec,
+      SystemStream systemStream, TaskContext context) {
     this.partitionByOpSpec = partitionByOpSpec;
-    OutputStreamImpl<KV<K, V>> outputStream = partitionByOpSpec.getOutputStream();
-    this.systemStream = outputStream.getSystemStream();
+    this.systemStream = systemStream;
     this.keyFunction = partitionByOpSpec.getKeyFunction();
     this.valueFunction = partitionByOpSpec.getValueFunction();
     this.taskName = context.getTaskName().getTaskName();
@@ -102,7 +100,6 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> {
   }
 
   private void sendControlMessage(ControlMessage message, MessageCollector collector) {
-    SystemStream outputStream = partitionByOpSpec.getOutputStream().getSystemStream();
-    controlMessageSender.send(message, outputStream, collector);
+    controlMessageSender.send(message, systemStream, collector);
   }
 }
index a636ac5..922a1f9 100644 (file)
@@ -22,7 +22,6 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.StreamSpec;
 
 /**
  * The spec for an operator that receives incoming messages from an input stream
@@ -34,7 +33,7 @@ import org.apache.samza.system.StreamSpec;
 public class InputOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Object> { // Object == KV<K, V> | V
 
   private final boolean isKeyed;
-  private final StreamSpec streamSpec;
+  private final String streamId;
 
   /**
    * The following {@link Serde}s are serialized by the ExecutionPlanner when generating the configs for a stream, and deserialized
@@ -43,17 +42,16 @@ public class InputOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Object> { //
   private transient final Serde<K> keySerde;
   private transient final Serde<V> valueSerde;
 
-  public InputOperatorSpec(StreamSpec streamSpec,
-      Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyed, String opId) {
+  public InputOperatorSpec(String streamId, Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyed, String opId) {
     super(OpCode.INPUT, opId);
-    this.streamSpec = streamSpec;
+    this.streamId = streamId;
     this.keySerde = keySerde;
     this.valueSerde = valueSerde;
     this.isKeyed = isKeyed;
   }
 
-  public StreamSpec getStreamSpec() {
-    return this.streamSpec;
+  public String getStreamId() {
+    return this.streamId;
   }
 
   public Serde<K> getKeySerde() {
index 6e98d5a..9e788da 100644 (file)
@@ -28,7 +28,6 @@ import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.table.TableSpec;
 
 
@@ -42,7 +41,7 @@ public class OperatorSpecs {
   /**
    * Creates an {@link InputOperatorSpec} for consuming input.
    *
-   * @param streamSpec  the stream spec for the input stream
+   * @param streamId  the stream id for the input stream
    * @param keySerde  the serde for the input key
    * @param valueSerde  the serde for the input value
    * @param isKeyed  whether the input stream is keyed
@@ -52,8 +51,8 @@ public class OperatorSpecs {
    * @return  the {@link InputOperatorSpec}
    */
   public static <K, V> InputOperatorSpec<K, V> createInputOperatorSpec(
-    StreamSpec streamSpec, Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyed, String opId) {
-    return new InputOperatorSpec<>(streamSpec, keySerde, valueSerde, isKeyed, opId);
+    String streamId, Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyed, String opId) {
+    return new InputOperatorSpec<>(streamId, keySerde, valueSerde, isKeyed, opId);
   }
 
   /**
index fe0abcb..5d70e6f 100644 (file)
@@ -21,13 +21,10 @@ package org.apache.samza.operators.spec;
 import java.io.Serializable;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
-
 
 public class OutputStreamImpl<M> implements OutputStream<M>, Serializable {
 
-  private final StreamSpec streamSpec;
+  private final String streamId;
   private final boolean isKeyed;
 
   /**
@@ -37,16 +34,15 @@ public class OutputStreamImpl<M> implements OutputStream<M>, Serializable {
   private transient final Serde keySerde;
   private transient final Serde valueSerde;
 
-  public OutputStreamImpl(StreamSpec streamSpec,
-      Serde keySerde, Serde valueSerde, boolean isKeyed) {
-    this.streamSpec = streamSpec;
+  public OutputStreamImpl(String streamId, Serde keySerde, Serde valueSerde, boolean isKeyed) {
+    this.streamId = streamId;
     this.keySerde = keySerde;
     this.valueSerde = valueSerde;
     this.isKeyed = isKeyed;
   }
 
-  public StreamSpec getStreamSpec() {
-    return streamSpec;
+  public String getStreamId() {
+    return streamId;
   }
 
   public Serde getKeySerde() {
@@ -57,10 +53,6 @@ public class OutputStreamImpl<M> implements OutputStream<M>, Serializable {
     return valueSerde;
   }
 
-  public SystemStream getSystemStream() {
-    return this.streamSpec.toSystemStream();
-  }
-
   public boolean isKeyed() {
     return isKeyed;
   }
index 272ba63..3bb8713 100644 (file)
@@ -24,7 +24,6 @@ import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
-import org.apache.samza.system.StreamSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,13 +50,13 @@ public class IntermediateMessageStreamImpl<M> extends MessageStreamImpl<M> imple
     this.outputStream = outputStream;
     if (inputOperatorSpec.isKeyed() != outputStream.isKeyed()) {
       LOGGER.error("Input and output streams for intermediate stream {} aren't keyed consistently. Input: {}, Output: {}",
-          new Object[]{inputOperatorSpec.getStreamSpec().getId(), inputOperatorSpec.isKeyed(), outputStream.isKeyed()});
+          new Object[]{inputOperatorSpec.getStreamId(), inputOperatorSpec.isKeyed(), outputStream.isKeyed()});
     }
     this.isKeyed = inputOperatorSpec.isKeyed() && outputStream.isKeyed();
   }
 
-  public StreamSpec getStreamSpec() {
-    return this.outputStream.getStreamSpec();
+  public String getStreamId() {
+    return this.outputStream.getStreamId();
   }
 
   public OutputStreamImpl<M> getOutputStream() {
index 3716d2b..7cd19fb 100644 (file)
@@ -38,7 +38,6 @@ import org.apache.samza.execution.ExecutionPlanner;
 import org.apache.samza.execution.StreamManager;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.StreamGraphSpec;
-import org.apache.samza.system.StreamSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,58 +55,7 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
 
   public AbstractApplicationRunner(Config config) {
     super(config);
-    this.graphSpec = new StreamGraphSpec(this, config);
-  }
-
-  @Override
-  public StreamSpec getStreamSpec(String streamId) {
-    StreamConfig streamConfig = new StreamConfig(config);
-    String physicalName = streamConfig.getPhysicalName(streamId);
-    return getStreamSpec(streamId, physicalName);
-  }
-
-  /**
-   * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
-   *
-   * The stream configurations are read from the following properties in the config:
-   * {@code streams.{$streamId}.*}
-   * <br>
-   * All properties matching this pattern are assumed to be system-specific with one exception. The following
-   * property is a Samza property which is used to bind the stream to a system.
-   *
-   * <ul>
-   *   <li>samza.system - The name of the System on which this stream will be used. If this property isn't defined
-   *                      the stream will be associated with the System defined in {@code job.default.system}</li>
-   * </ul>
-   *
-   * @param streamId      The logical identifier for the stream in Samza.
-   * @param physicalName  The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
-   * @return              The {@link StreamSpec} instance.
-   */
-  /*package private*/ StreamSpec getStreamSpec(String streamId, String physicalName) {
-    StreamConfig streamConfig = new StreamConfig(config);
-    String system = streamConfig.getSystem(streamId);
-
-    return getStreamSpec(streamId, physicalName, system);
-  }
-
-  /**
-   * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
-   *
-   * The stream configurations are read from the following properties in the config:
-   * {@code streams.{$streamId}.*}
-   *
-   * @param streamId      The logical identifier for the stream in Samza.
-   * @param physicalName  The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
-   * @param system        The name of the System on which this stream will be used.
-   * @return              The {@link StreamSpec} instance.
-   */
-  /*package private*/ StreamSpec getStreamSpec(String streamId, String physicalName, String system) {
-    StreamConfig streamConfig = new StreamConfig(config);
-    Map<String, String> properties = streamConfig.getStreamProperties(streamId);
-    boolean isBounded = streamConfig.getIsBounded(streamId);
-
-    return new StreamSpec(streamId, physicalName, system, isBounded, properties);
+    this.graphSpec = new StreamGraphSpec(config);
   }
 
   public ExecutionPlan getExecutionPlan(StreamApplication app, StreamManager streamManager) throws Exception {
@@ -118,20 +66,22 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
   ExecutionPlan getExecutionPlan(StreamApplication app, String runId, StreamManager streamManager) throws Exception {
     // build stream graph
     app.init(graphSpec, config);
-
     OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
-    // create the physical execution plan
+
+    // update application configs
     Map<String, String> cfg = new HashMap<>(config);
     if (StringUtils.isNoneEmpty(runId)) {
       cfg.put(ApplicationConfig.APP_RUN_ID, runId);
     }
 
-    Set<StreamSpec> inputStreams = new HashSet<>(specGraph.getInputOperators().keySet());
+    StreamConfig streamConfig = new StreamConfig(config);
+    Set<String> inputStreams = new HashSet<>(specGraph.getInputOperators().keySet());
     inputStreams.removeAll(specGraph.getOutputStreams().keySet());
-    ApplicationMode mode = inputStreams.stream().allMatch(StreamSpec::isBounded)
+    ApplicationMode mode = inputStreams.stream().allMatch(streamConfig::getIsBounded)
         ? ApplicationMode.BATCH : ApplicationMode.STREAM;
     cfg.put(ApplicationConfig.APP_MODE, mode.name());
 
+    // create the physical execution plan
     ExecutionPlanner planner = new ExecutionPlanner(new MapConfig(cfg), streamManager);
     return planner.plan(specGraph);
   }
index 6aeb2ba..ea55fe5 100644 (file)
@@ -34,7 +34,7 @@ import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.Util;
+import org.apache.samza.util.StreamUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -113,7 +113,7 @@ public class ChangelogStreamManager {
         .stream()
         .filter(name -> StringUtils.isNotBlank(storageConfig.getChangelogStream(name)))
         .collect(Collectors.toMap(name -> name,
-            name -> Util.getSystemStreamFromNames(storageConfig.getChangelogStream(name))));
+            name -> StreamUtil.getSystemStreamFromNames(storageConfig.getChangelogStream(name))));
 
     // Get SystemAdmin for changelog store's system and attempt to create the stream
     JavaSystemConfig systemConfig = new JavaSystemConfig(config);
index c807b02..f9c6c0c 100644 (file)
@@ -52,6 +52,7 @@ import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.CommandLine;
 import org.apache.samza.util.ScalaJavaUtil;
+import org.apache.samza.util.StreamUtil;
 import org.apache.samza.util.SystemClock;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
@@ -151,7 +152,7 @@ public class StorageRecovery extends CommandLine {
       log.info("stream name for " + storeName + " is " + streamName);
 
       if (streamName != null) {
-        changeLogSystemStreams.put(storeName, Util.getSystemStreamFromNames(streamName));
+        changeLogSystemStreams.put(storeName, StreamUtil.getSystemStreamFromNames(streamName));
       }
 
       String factoryClass = config.getStorageFactoryClassName(storeName);
diff --git a/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java b/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java
new file mode 100644 (file)
index 0000000..e7a1e54
--- /dev/null
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
+
+public class StreamUtil {
+  /**
+   * Gets the {@link SystemStream} corresponding to the provided stream, which may be
+   * a streamId, or stream name of the format systemName.streamName.
+   *
+   * @param stream the stream name or id to get the {@link SystemStream} for.
+   * @return the {@link SystemStream} for the stream
+   */
+  public static SystemStream getSystemStreamFromNameOrId(Config config, String stream) {
+    String[] parts = stream.split("\\.");
+    if (parts.length == 0 || parts.length > 2) {
+      throw new SamzaException(
+          String.format("Invalid stream %s. Expected to be of the format streamId or systemName.streamName", stream));
+    }
+    if (parts.length == 1) {
+      return new StreamConfig(config).streamIdToSystemStream(stream);
+    } else {
+      return new SystemStream(parts[0], parts[1]);
+    }
+  }
+
+  /**
+   * Returns a SystemStream object based on the system stream name given. For
+   * example, kafka.topic would return SystemStream("kafka", "topic").
+   *
+   * @param systemStreamName name of the system stream
+   * @return the {@link SystemStream} for the {@code systemStreamName}
+   */
+  public static SystemStream getSystemStreamFromNames(String systemStreamName) {
+    int idx = systemStreamName.indexOf('.');
+    if (idx < 0) {
+      throw new IllegalArgumentException("No '.' in stream name '" + systemStreamName +
+          "'. Stream names should be in the form 'system.stream'");
+    }
+    return new SystemStream(
+        systemStreamName.substring(0, idx),
+        systemStreamName.substring(idx + 1, systemStreamName.length()));
+  }
+
+  /**
+   * Returns the period separated system stream name for the provided {@code systemStream}. For
+   * example, SystemStream("kafka", "topic") would return "kafka.topic".
+   *
+   * @param systemStream the {@link SystemStream} to get the name for
+   * @return the system stream name
+   */
+  public static String getNameFromSystemStream(SystemStream systemStream) {
+    return systemStream.getSystem() + "." + systemStream.getStream();
+  }
+
+  public static Set<StreamSpec> getStreamSpecs(Set<String> streamIds, StreamConfig streamConfig) {
+    return streamIds.stream().map(streamId -> getStreamSpec(streamId, streamConfig)).collect(Collectors.toSet());
+  }
+
+  public static StreamSpec getStreamSpec(String streamId, StreamConfig streamConfig) {
+    String physicalName = streamConfig.getPhysicalName(streamId);
+    String system = streamConfig.getSystem(streamId);
+    Map<String, String> streamProperties = streamConfig.getStreamProperties(streamId);
+    return new StreamSpec(streamId, physicalName, system, streamProperties);
+  }
+}
index c9df3b5..42b6130 100644 (file)
@@ -22,8 +22,7 @@ package org.apache.samza.config
 
 import java.util.concurrent.TimeUnit
 import scala.collection.JavaConverters._
-import org.apache.samza.util.Logging
-import org.apache.samza.util.Util
+import org.apache.samza.util.{Logging, StreamUtil}
 
 object StorageConfig {
   // stream config constants
@@ -106,7 +105,7 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging
       .map(getChangelogStream(_))
       .filter(_.isDefined)
       // Convert "system.stream" to systemName
-      .map(systemStreamName => Util.getSystemStreamFromNames(systemStreamName.get).getSystem)
+      .map(systemStreamName => StreamUtil.getSystemStreamFromNames(systemStreamName.get).getSystem)
       .contains(systemName)
   }
 
index ab11785..a64589f 100644 (file)
@@ -22,7 +22,7 @@ package org.apache.samza.config
 import org.apache.samza.checkpoint.CheckpointManager
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.SystemStream
-import org.apache.samza.util.{Logging, Util}
+import org.apache.samza.util.{Logging, StreamUtil}
 
 object TaskConfig {
   // task config constants
@@ -78,7 +78,7 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   def getInputStreams = getOption(TaskConfig.INPUT_STREAMS) match {
     case Some(streams) => if (streams.length > 0) {
       streams.split(",").map(systemStreamNames => {
-        Util.getSystemStreamFromNames(systemStreamNames.trim)
+        StreamUtil.getSystemStreamFromNames(systemStreamNames.trim)
       }).toSet
     } else {
       Set[SystemStream]()
index 35802ac..bb1b1cf 100644 (file)
@@ -326,7 +326,7 @@ object SamzaContainer extends Logging {
       .getStoreNames
       .filter(config.getChangelogStream(_).isDefined)
       .map(name => (name, config.getChangelogStream(name).get)).toMap
-      .mapValues(Util.getSystemStreamFromNames(_))
+      .mapValues(StreamUtil.getSystemStreamFromNames(_))
 
     info("Got change log system streams: %s" format changeLogSystemStreams)
 
@@ -357,7 +357,7 @@ object SamzaContainer extends Logging {
     val sideInputStoresToSystemStreams = config.getStoreNames
       .map { storeName => (storeName, config.getSideInputs(storeName)) }
       .filter { case (storeName, sideInputs) => sideInputs.nonEmpty }
-      .map { case (storeName, sideInputs) => (storeName, sideInputs.map(Util.getSystemStreamFromNameOrId(config, _))) }
+      .map { case (storeName, sideInputs) => (storeName, sideInputs.map(StreamUtil.getSystemStreamFromNameOrId(config, _))) }
       .toMap
 
     info("Got side input store system streams: %s" format sideInputStoresToSystemStreams)
index 7b83874..0b472aa 100644 (file)
@@ -28,7 +28,6 @@ import org.apache.samza.coordinator.stream.CoordinatorStreamManager
 import org.apache.samza.job.{StreamJob, StreamJobFactory}
 import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap, MetricsReporter}
 import org.apache.samza.operators.StreamGraphSpec
-import org.apache.samza.runtime.LocalContainerRunner
 import org.apache.samza.storage.ChangelogStreamManager
 import org.apache.samza.task.TaskFactoryUtil
 import org.apache.samza.util.Logging
@@ -72,10 +71,9 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
     val containerId = "0"
     val jmxServer = new JmxServer
     val streamApp = TaskFactoryUtil.createStreamApplication(config)
-    val appRunner = new LocalContainerRunner(jobModel, "0")
 
     val taskFactory = if (streamApp != null) {
-      val graphSpec = new StreamGraphSpec(appRunner, config)
+      val graphSpec = new StreamGraphSpec(config)
       streamApp.init(graphSpec, config)
       TaskFactoryUtil.createTaskFactory(graphSpec.getOperatorSpecGraph(), graphSpec.getContextManager)
     } else {
index 33802a1..e41d4a8 100644 (file)
@@ -19,7 +19,7 @@
 
 package org.apache.samza.metrics.reporter
 
-import org.apache.samza.util.Logging
+import org.apache.samza.util.{Logging, StreamUtil, Util}
 import org.apache.samza.SamzaException
 import org.apache.samza.config.{ApplicationConfig, Config}
 import org.apache.samza.config.JobConfig.Config2Job
@@ -30,7 +30,6 @@ import org.apache.samza.config.SerializerConfig.Config2Serializer
 import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.metrics.MetricsReporter
 import org.apache.samza.metrics.MetricsReporterFactory
-import org.apache.samza.util.Util
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.serializers.SerdeFactory
 import org.apache.samza.system.SystemFactory
@@ -68,7 +67,7 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
       .getMetricsReporterStream(name)
       .getOrElse(throw new SamzaException("No metrics stream defined in config."))
 
-    val systemStream = Util.getSystemStreamFromNames(metricsSystemStreamName)
+    val systemStream = StreamUtil.getSystemStreamFromNames(metricsSystemStreamName)
 
     info("Got system stream %s." format systemStream)
 
index fd06c20..53b7d19 100644 (file)
@@ -22,9 +22,7 @@ package org.apache.samza.util
 
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config._
-import org.apache.samza.system.SystemStream
 import org.apache.samza.SamzaException
-
 import java.lang.management.ManagementFactory
 import java.net.Inet4Address
 import java.net.InetAddress
@@ -67,46 +65,6 @@ object Util extends Logging {
   }
 
   /**
-   * Returns a SystemStream object based on the system stream name given. For
-   * example, kafka.topic would return new SystemStream("kafka", "topic").
-   */
-  def getSystemStreamFromNames(systemStreamNames: String): SystemStream = {
-    val idx = systemStreamNames.indexOf('.')
-    if (idx < 0) {
-      throw new IllegalArgumentException("No '.' in stream name '" + systemStreamNames + "'. Stream names should be in the form 'system.stream'")
-    }
-    new SystemStream(systemStreamNames.substring(0, idx), systemStreamNames.substring(idx + 1, systemStreamNames.length))
-  }
-
-  /**
-   * Returns a SystemStream object based on the system stream name given. For
-   * example, kafka.topic would return new SystemStream("kafka", "topic").
-   */
-  def getNameFromSystemStream(systemStream: SystemStream) = {
-    systemStream.getSystem + "." + systemStream.getStream
-  }
-
-  /**
-    * Gets the [[SystemStream]] corresponding to the provided stream, which may be
-    * a streamId, or stream name of the format systemName.streamName.
-    *
-    * @param stream the stream name or id to get the { @link SystemStream} for.
-    * @return the [[SystemStream]] for the stream
-    */
-  def getSystemStreamFromNameOrId(config: Config, stream: String): SystemStream = {
-    val parts = stream.split("\\.")
-    if (parts.length == 0 || parts.length > 2) {
-      throw new SamzaException(
-        String.format("Invalid stream %s. Expected to be of the format streamId or systemName.streamName", stream))
-    }
-    if (parts.length == 1) {
-      new StreamConfig(config).streamIdToSystemStream(stream)
-    } else {
-      new SystemStream(parts(0), parts(1))
-    }
-  }
-
-  /**
    * Returns the the first host address which is not the loopback address, or [[java.net.InetAddress#getLocalHost]] as a fallback
    *
    * @return the [[java.net.InetAddress]] which represents the localhost
index 83fe5ad..4d25ebb 100644 (file)
@@ -38,13 +38,13 @@ import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.testUtils.StreamTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -59,7 +59,6 @@ public class TestExecutionPlanner {
 
   private SystemAdmins systemAdmins;
   private StreamManager streamManager;
-  private ApplicationRunner runner;
   private Config config;
 
   private StreamSpec input1;
@@ -104,7 +103,7 @@ public class TestExecutionPlanner {
      * input1 -> partitionBy -> map -> output1
      *
      */
-    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
     MessageStream<KV<Object, Object>> input1 = graphSpec.getInputStream("input1");
     OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream("output1");
     input1
@@ -127,7 +126,7 @@ public class TestExecutionPlanner {
      *
      */
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
     MessageStream<KV<Object, Object>> messageStream1 =
         graphSpec.<KV<Object, Object>>getInputStream("input1")
             .map(m -> m);
@@ -159,7 +158,7 @@ public class TestExecutionPlanner {
 
   private StreamGraphSpec createStreamGraphWithJoinAndWindow() {
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
     MessageStream<KV<Object, Object>> messageStream1 =
         graphSpec.<KV<Object, Object>>getInputStream("input1")
             .map(m -> m);
@@ -207,7 +206,12 @@ public class TestExecutionPlanner {
     Map<String, String> configMap = new HashMap<>();
     configMap.put(JobConfig.JOB_NAME(), "test-app");
     configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), DEFAULT_SYSTEM);
-
+    StreamTestUtils.addStreamConfigs(configMap, "input1", "system1", "input1");
+    StreamTestUtils.addStreamConfigs(configMap, "input2", "system2", "input2");
+    StreamTestUtils.addStreamConfigs(configMap, "input3", "system2", "input3");
+    StreamTestUtils.addStreamConfigs(configMap, "input4", "system1", "input4");
+    StreamTestUtils.addStreamConfigs(configMap, "output1", "system1", "output1");
+    StreamTestUtils.addStreamConfigs(configMap, "output2", "system2", "output2");
     config = new MapConfig(configMap);
 
     input1 = new StreamSpec("input1", "input1", "system1");
@@ -234,22 +238,6 @@ public class TestExecutionPlanner {
     when(systemAdmins.getSystemAdmin("system1")).thenReturn(systemAdmin1);
     when(systemAdmins.getSystemAdmin("system2")).thenReturn(systemAdmin2);
     streamManager = new StreamManager(systemAdmins);
-
-    runner = mock(ApplicationRunner.class);
-    when(runner.getStreamSpec("input1")).thenReturn(input1);
-    when(runner.getStreamSpec("input2")).thenReturn(input2);
-    when(runner.getStreamSpec("input3")).thenReturn(input3);
-    when(runner.getStreamSpec("input4")).thenReturn(input4);
-    when(runner.getStreamSpec("output1")).thenReturn(output1);
-    when(runner.getStreamSpec("output2")).thenReturn(output2);
-
-    // intermediate streams used in tests
-    when(runner.getStreamSpec("test-app-1-partition_by-p1"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-p1", "test-app-1-partition_by-p1", "default-system"));
-    when(runner.getStreamSpec("test-app-1-partition_by-p2"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-p2", "test-app-1-partition_by-p2", "default-system"));
-    when(runner.getStreamSpec("test-app-1-partition_by-p3"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-p3", "test-app-1-partition_by-p3", "default-system"));
   }
 
   @Test
@@ -288,7 +276,7 @@ public class TestExecutionPlanner {
     JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph());
 
     ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
-    ExecutionPlanner.calculateJoinInputPartitions(jobGraph);
+    ExecutionPlanner.calculateJoinInputPartitions(jobGraph, config);
 
     // the partitions should be the same as input1
     jobGraph.getIntermediateStreams().forEach(edge -> {
@@ -406,13 +394,13 @@ public class TestExecutionPlanner {
   @Test
   public void testMaxPartition() {
     Collection<StreamEdge> edges = new ArrayList<>();
-    StreamEdge edge = new StreamEdge(input1, config);
+    StreamEdge edge = new StreamEdge(input1, false, false, config);
     edge.setPartitionCount(2);
     edges.add(edge);
-    edge = new StreamEdge(input2, config);
+    edge = new StreamEdge(input2, false, false, config);
     edge.setPartitionCount(32);
     edges.add(edge);
-    edge = new StreamEdge(input3, config);
+    edge = new StreamEdge(input3, false, false, config);
     edge.setPartitionCount(16);
     edges.add(edge);
 
@@ -427,7 +415,7 @@ public class TestExecutionPlanner {
     int partitionLimit = ExecutionPlanner.MAX_INFERRED_PARTITIONS;
 
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
 
     MessageStream<KV<Object, Object>> input1 = graphSpec.getInputStream("input4");
     OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream("output1");
index 359c422..73452d8 100644 (file)
 
 package org.apache.samza.execution;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.system.StreamSpec;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 public class TestJobGraph {
@@ -57,7 +61,9 @@ public class TestJobGraph {
    * 2 9 10
    */
   private void createGraph1() {
-    graph1 = new JobGraph(null, null);
+    OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class);
+    when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet());
+    graph1 = new JobGraph(null, specGraph);
 
     JobNode n2 = graph1.getOrCreateJobNode("2", "1");
     JobNode n3 = graph1.getOrCreateJobNode("3", "1");
@@ -90,7 +96,9 @@ public class TestJobGraph {
    *      |<---6 <--|    <>
    */
   private void createGraph2() {
-    graph2 = new JobGraph(null, null);
+    OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class);
+    when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet());
+    graph2 = new JobGraph(null, specGraph);
 
     JobNode n1 = graph2.getOrCreateJobNode("1", "1");
     JobNode n2 = graph2.getOrCreateJobNode("2", "1");
@@ -117,7 +125,9 @@ public class TestJobGraph {
    * 1<->1 -> 2<->2
    */
   private void createGraph3() {
-    graph3 = new JobGraph(null, null);
+    OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class);
+    when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet());
+    graph3 = new JobGraph(null, specGraph);
 
     JobNode n1 = graph3.getOrCreateJobNode("1", "1");
     JobNode n2 = graph3.getOrCreateJobNode("2", "1");
@@ -133,7 +143,9 @@ public class TestJobGraph {
    * 1<->1
    */
   private void createGraph4() {
-    graph4 = new JobGraph(null, null);
+    OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class);
+    when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet());
+    graph4 = new JobGraph(null, specGraph);
 
     JobNode n1 = graph4.getOrCreateJobNode("1", "1");
 
@@ -151,7 +163,9 @@ public class TestJobGraph {
 
   @Test
   public void testAddSource() {
-    JobGraph graph = new JobGraph(null, null);
+    OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class);
+    when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet());
+    JobGraph graph = new JobGraph(null, specGraph);
 
     /**
      * s1 -> 1
@@ -192,7 +206,9 @@ public class TestJobGraph {
      * 2 -> s2
      * 2 -> s3
      */
-    JobGraph graph = new JobGraph(null, null);
+    OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class);
+    when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet());
+    JobGraph graph = new JobGraph(null, specGraph);
     JobNode n1 = graph.getOrCreateJobNode("1", "1");
     JobNode n2 = graph.getOrCreateJobNode("2", "1");
     StreamSpec s1 = genStream();
index abe8969..b0f3843 100644 (file)
@@ -31,15 +31,14 @@ import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.LongSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.testUtils.StreamTestUtils;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Test;
 
@@ -76,28 +75,13 @@ public class TestJobGraphJsonGenerator {
     Map<String, String> configMap = new HashMap<>();
     configMap.put(JobConfig.JOB_NAME(), "test-app");
     configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system");
+    StreamTestUtils.addStreamConfigs(configMap, "input1", "system1", "input1");
+    StreamTestUtils.addStreamConfigs(configMap, "input2", "system2", "input2");
+    StreamTestUtils.addStreamConfigs(configMap, "input3", "system2", "input3");
+    StreamTestUtils.addStreamConfigs(configMap, "output1", "system1", "output1");
+    StreamTestUtils.addStreamConfigs(configMap, "output2", "system2", "output2");
     Config config = new MapConfig(configMap);
 
-    StreamSpec input1 = new StreamSpec("input1", "input1", "system1");
-    StreamSpec input2 = new StreamSpec("input2", "input2", "system2");
-    StreamSpec input3 = new StreamSpec("input3", "input3", "system2");
-
-    StreamSpec output1 = new StreamSpec("output1", "output1", "system1");
-    StreamSpec output2 = new StreamSpec("output2", "output2", "system2");
-
-    ApplicationRunner runner = mock(ApplicationRunner.class);
-    when(runner.getStreamSpec("input1")).thenReturn(input1);
-    when(runner.getStreamSpec("input2")).thenReturn(input2);
-    when(runner.getStreamSpec("input3")).thenReturn(input3);
-    when(runner.getStreamSpec("output1")).thenReturn(output1);
-    when(runner.getStreamSpec("output2")).thenReturn(output2);
-
-    // intermediate streams used in tests
-    when(runner.getStreamSpec("test-app-1-partition_by-p1"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-p1", "test-app-1-partition_by-p1", "default-system"));
-    when(runner.getStreamSpec("test-app-1-partition_by-p2"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-p2", "test-app-1-partition_by-p2", "default-system"));
-
     // set up external partition count
     Map<String, Integer> system1Map = new HashMap<>();
     system1Map.put("input1", 64);
@@ -114,7 +98,7 @@ public class TestJobGraphJsonGenerator {
     when(systemAdmins.getSystemAdmin("system2")).thenReturn(systemAdmin2);
     StreamManager streamManager = new StreamManager(systemAdmins);
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
     graphSpec.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
     MessageStream<KV<Object, Object>> messageStream1 =
         graphSpec.<KV<Object, Object>>getInputStream("input1")
@@ -163,19 +147,10 @@ public class TestJobGraphJsonGenerator {
     Map<String, String> configMap = new HashMap<>();
     configMap.put(JobConfig.JOB_NAME(), "test-app");
     configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system");
+    StreamTestUtils.addStreamConfigs(configMap, "PageView", "hdfs", "hdfs:/user/dummy/PageViewEvent");
+    StreamTestUtils.addStreamConfigs(configMap, "PageViewCount", "kafka", "PageViewCount");
     Config config = new MapConfig(configMap);
 
-    StreamSpec input = new StreamSpec("PageView", "hdfs:/user/dummy/PageViewEvent", "hdfs");
-    StreamSpec output = new StreamSpec("PageViewCount", "PageViewCount", "kafka");
-
-    ApplicationRunner runner = mock(ApplicationRunner.class);
-    when(runner.getStreamSpec("PageView")).thenReturn(input);
-    when(runner.getStreamSpec("PageViewCount")).thenReturn(output);
-
-    // intermediate streams used in tests
-    when(runner.getStreamSpec("test-app-1-partition_by-keyed-by-country"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-keyed-by-country", "test-app-1-partition_by-keyed-by-country", "kafka"));
-
     // set up external partition count
     Map<String, Integer> system1Map = new HashMap<>();
     system1Map.put("hdfs:/user/dummy/PageViewEvent", 512);
@@ -189,7 +164,7 @@ public class TestJobGraphJsonGenerator {
     when(systemAdmins.getSystemAdmin("kafka")).thenReturn(systemAdmin2);
     StreamManager streamManager = new StreamManager(systemAdmins);
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
     MessageStream<KV<String, PageViewEvent>> inputStream = graphSpec.getInputStream("PageView");
     inputStream
         .partitionBy(kv -> kv.getValue().getCountry(), kv -> kv.getValue(), "keyed-by-country")
index c43e242..cefe128 100644 (file)
@@ -29,7 +29,6 @@ import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.impl.store.TimestampedValueSerde;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
@@ -48,7 +47,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -56,22 +54,17 @@ public class TestJobNode {
 
   @Test
   public void testAddSerdeConfigs() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     StreamSpec input1Spec = new StreamSpec("input1", "input1", "input-system");
     StreamSpec input2Spec = new StreamSpec("input2", "input2", "input-system");
     StreamSpec outputSpec = new StreamSpec("output", "output", "output-system");
     StreamSpec partitionBySpec =
         new StreamSpec("jobName-jobId-partition_by-p1", "partition_by-p1", "intermediate-system");
-    doReturn(input1Spec).when(mockRunner).getStreamSpec("input1");
-    doReturn(input2Spec).when(mockRunner).getStreamSpec("input2");
-    doReturn(outputSpec).when(mockRunner).getStreamSpec("output");
-    doReturn(partitionBySpec).when(mockRunner).getStreamSpec("jobName-jobId-partition_by-p1");
 
     Config mockConfig = mock(Config.class);
     when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
     when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
     graphSpec.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
     MessageStream<KV<String, Object>> input1 = graphSpec.getInputStream("input1");
     MessageStream<KV<String, Object>> input2 = graphSpec.getInputStream("input2");
@@ -86,10 +79,10 @@ public class TestJobNode {
 
     JobNode jobNode = new JobNode("jobName", "jobId", graphSpec.getOperatorSpecGraph(), mockConfig);
     Config config = new MapConfig();
-    StreamEdge input1Edge = new StreamEdge(input1Spec, config);
-    StreamEdge input2Edge = new StreamEdge(input2Spec, config);
-    StreamEdge outputEdge = new StreamEdge(outputSpec, config);
-    StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true, config);
+    StreamEdge input1Edge = new StreamEdge(input1Spec, false, false, config);
+    StreamEdge input2Edge = new StreamEdge(input2Spec, false, false, config);
+    StreamEdge outputEdge = new StreamEdge(outputSpec, false, false, config);
+    StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true, false, config);
     jobNode.addInEdge(input1Edge);
     jobNode.addInEdge(input2Edge);
     jobNode.addOutEdge(outputEdge);
index 8ad5b7e..5928db1 100644 (file)
@@ -37,7 +37,7 @@ public class TestStreamEdge {
 
   @Test
   public void testGetStreamSpec() {
-    StreamEdge edge = new StreamEdge(spec, false, new MapConfig());
+    StreamEdge edge = new StreamEdge(spec, false, false, new MapConfig());
     assertEquals(edge.getStreamSpec(), spec);
     assertEquals(edge.getStreamSpec().getPartitionCount(), 1 /*StreamSpec.DEFAULT_PARTITION_COUNT*/);
 
@@ -50,32 +50,30 @@ public class TestStreamEdge {
     Map<String, String> config = new HashMap<>();
     config.put(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name());
     config.put(ApplicationConfig.APP_RUN_ID, "123");
-    StreamEdge edge = new StreamEdge(spec, true, new MapConfig(config));
+    StreamEdge edge = new StreamEdge(spec, true, false, new MapConfig(config));
     assertEquals(edge.getStreamSpec().getPhysicalName(), spec.getPhysicalName() + "-123");
   }
 
   @Test
   public void testGenerateConfig() {
     // an example unbounded IO stream
-    StreamSpec spec = new StreamSpec("stream-1", "physical-stream-1", "system-1", false, Collections.singletonMap("property1", "haha"));
-    StreamEdge edge = new StreamEdge(spec, false, new MapConfig());
+    StreamSpec spec = new StreamSpec("stream-1", "physical-stream-1", "system-1", Collections.singletonMap("property1", "haha"));
+    StreamEdge edge = new StreamEdge(spec, false, false, new MapConfig());
     Config config = edge.generateConfig();
     StreamConfig streamConfig = new StreamConfig(config);
     assertEquals(streamConfig.getSystem(spec.getId()), "system-1");
     assertEquals(streamConfig.getPhysicalName(spec.getId()), "physical-stream-1");
     assertEquals(streamConfig.getIsIntermediateStream(spec.getId()), false);
-    assertEquals(streamConfig.getIsBounded(spec.getId()), false);
     assertEquals(streamConfig.getStreamProperties(spec.getId()).get("property1"), "haha");
 
     // bounded stream
-    spec = new StreamSpec("stream-1", "physical-stream-1", "system-1", true, Collections.singletonMap("property1", "haha"));
-    edge = new StreamEdge(spec, false, new MapConfig());
+    spec = new StreamSpec("stream-1", "physical-stream-1", "system-1", Collections.singletonMap("property1", "haha"));
+    edge = new StreamEdge(spec, false, false, new MapConfig());
     config = edge.generateConfig();
     streamConfig = new StreamConfig(config);
-    assertEquals(streamConfig.getIsBounded(spec.getId()), true);
 
     // intermediate stream
-    edge = new StreamEdge(spec, true, new MapConfig());
+    edge = new StreamEdge(spec, true, false, new MapConfig());
     config = edge.generateConfig();
     streamConfig = new StreamConfig(config);
     assertEquals(streamConfig.getIsIntermediateStream(spec.getId()), true);
index 602b595..7054727 100644 (file)
@@ -28,12 +28,10 @@ import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.impl.store.TestInMemoryStore;
 import org.apache.samza.operators.impl.store.TimestampedValueSerde;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
@@ -74,7 +72,6 @@ public class TestJoinOperator {
   @Before
   public void setUp() {
     Map<String, String> mapConfig = new HashMap<>();
-    mapConfig.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner");
     mapConfig.put("job.default.system", "insystem");
     mapConfig.put("job.name", "jobName");
     mapConfig.put("job.id", "jobId");
@@ -101,7 +98,7 @@ public class TestJoinOperator {
   public void joinWithSelfThrowsException() throws Exception {
     config.put("streams.instream.system", "insystem");
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(ApplicationRunner.class), config);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
     IntegerSerde integerSerde = new IntegerSerde();
     KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
     MessageStream<KV<Integer, Integer>> inStream = graphSpec.getInputStream("instream", kvSerde);
@@ -320,11 +317,7 @@ public class TestJoinOperator {
   }
 
   private StreamGraphSpec getTestJoinStreamGraph(TestJoinFunction joinFn) throws IOException {
-    ApplicationRunner runner = mock(ApplicationRunner.class);
-    when(runner.getStreamSpec("instream")).thenReturn(new StreamSpec("instream", "instream", "insystem"));
-    when(runner.getStreamSpec("instream2")).thenReturn(new StreamSpec("instream2", "instream2", "insystem"));
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
     IntegerSerde integerSerde = new IntegerSerde();
     KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
     MessageStream<KV<Integer, Integer>> inStream = graphSpec.getInputStream("instream", kvSerde);
index 2be88ca..4cfc66a 100644 (file)
@@ -38,7 +38,6 @@ import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.system.StreamSpec;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -59,8 +58,8 @@ import static org.mockito.Mockito.*;
 public class TestOperatorSpecGraph {
 
   private StreamGraphSpec mockGraph;
-  private Map<StreamSpec, InputOperatorSpec> inputOpSpecMap;
-  private Map<StreamSpec, OutputStreamImpl> outputStrmMap;
+  private Map<String, InputOperatorSpec> inputOpSpecMap;
+  private Map<String, OutputStreamImpl> outputStrmMap;
   private Set<OperatorSpec> allOpSpecs;
 
   @Before
@@ -72,26 +71,26 @@ public class TestOperatorSpecGraph {
      * 1) input1 --> filter --> sendTo
      * 2) input2 --> map --> sink
      */
-    StreamSpec testInputSpec = new StreamSpec("test-input-1", "test-input-1", "kafka");
-    InputOperatorSpec testInput = new InputOperatorSpec(testInputSpec, new NoOpSerde(), new NoOpSerde(), true, "test-input-1");
+    String inputStreamId1 = "test-input-1";
+    String outputStreamId = "test-output-1";
+    InputOperatorSpec testInput = new InputOperatorSpec(inputStreamId1, new NoOpSerde(), new NoOpSerde(), true, inputStreamId1);
     StreamOperatorSpec filterOp = OperatorSpecs.createFilterOperatorSpec(m -> true, "test-filter-2");
-    StreamSpec testOutputSpec = new StreamSpec("test-output-1", "test-output-1", "kafka");
-    OutputStreamImpl outputStream1 = new OutputStreamImpl(testOutputSpec, null, null, true);
+    OutputStreamImpl outputStream1 = new OutputStreamImpl(outputStreamId, null, null, true);
     OutputOperatorSpec outputSpec = OperatorSpecs.createSendToOperatorSpec(outputStream1, "test-output-3");
     testInput.registerNextOperatorSpec(filterOp);
     filterOp.registerNextOperatorSpec(outputSpec);
-    StreamSpec testInputSpec2 = new StreamSpec("test-input-2", "test-input-2", "kafka");
-    InputOperatorSpec testInput2 = new InputOperatorSpec(testInputSpec2, new NoOpSerde(), new NoOpSerde(), true, "test-input-4");
+    String streamId2 = "test-input-2";
+    InputOperatorSpec testInput2 = new InputOperatorSpec(streamId2, new NoOpSerde(), new NoOpSerde(), true, "test-input-4");
     StreamOperatorSpec testMap = OperatorSpecs.createMapOperatorSpec(m -> m, "test-map-5");
     SinkOperatorSpec testSink = OperatorSpecs.createSinkOperatorSpec((m, mc, tc) -> { }, "test-sink-6");
     testInput2.registerNextOperatorSpec(testMap);
     testMap.registerNextOperatorSpec(testSink);
 
     this.inputOpSpecMap = new LinkedHashMap<>();
-    inputOpSpecMap.put(testInputSpec, testInput);
-    inputOpSpecMap.put(testInputSpec2, testInput2);
+    inputOpSpecMap.put(inputStreamId1, testInput);
+    inputOpSpecMap.put(streamId2, testInput2);
     this.outputStrmMap = new LinkedHashMap<>();
-    outputStrmMap.put(testOutputSpec, outputStream1);
+    outputStrmMap.put(outputStreamId, outputStream1);
     when(mockGraph.getInputOperators()).thenReturn(Collections.unmodifiableMap(inputOpSpecMap));
     when(mockGraph.getOutputStreams()).thenReturn(Collections.unmodifiableMap(outputStrmMap));
     this.allOpSpecs = new HashSet<OperatorSpec>() { {
index e476abc..dfb4b70 100644 (file)
@@ -31,11 +31,9 @@ import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.table.TableSpec;
 import org.junit.Test;
 
@@ -53,82 +51,71 @@ public class TestStreamGraphSpec {
 
   @Test
   public void testGetInputStreamWithValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
+    String streamId = "test-stream-1";
     Serde mockValueSerde = mock(Serde.class);
-    MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream("test-stream-1", mockValueSerde);
+    MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream(streamId, mockValueSerde);
 
     InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
         (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), inputOpSpec);
-    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
+    assertEquals(streamId, inputOpSpec.getStreamId());
     assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
     assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
   }
 
   @Test
   public void testGetInputStreamWithKeyValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
+    String streamId = "test-stream-1";
     KVSerde mockKVSerde = mock(KVSerde.class);
     Serde mockKeySerde = mock(Serde.class);
     Serde mockValueSerde = mock(Serde.class);
     doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
     doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
-    MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream("test-stream-1", mockKVSerde);
+    MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream(streamId, mockKVSerde);
 
     InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
         (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), inputOpSpec);
-    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
+    assertEquals(streamId, inputOpSpec.getStreamId());
     assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
     assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
   }
 
   @Test(expected = NullPointerException.class)
   public void testGetInputStreamWithNullSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
     graphSpec.getInputStream("test-stream-1", null);
   }
 
   @Test
   public void testGetInputStreamWithDefaultValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
     Serde mockValueSerde = mock(Serde.class);
     graphSpec.setDefaultSerde(mockValueSerde);
-    MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream("test-stream-1");
+    MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream(streamId);
 
     InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
         (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), inputOpSpec);
-    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
+    assertEquals(streamId, inputOpSpec.getStreamId());
     assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
     assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
   }
 
   @Test
   public void testGetInputStreamWithDefaultKeyValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
     KVSerde mockKVSerde = mock(KVSerde.class);
     Serde mockKeySerde = mock(Serde.class);
@@ -136,63 +123,56 @@ public class TestStreamGraphSpec {
     doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
     doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
     graphSpec.setDefaultSerde(mockKVSerde);
-    MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream("test-stream-1");
+    MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream(streamId);
 
     InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
         (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), inputOpSpec);
-    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
+    assertEquals(streamId, inputOpSpec.getStreamId());
     assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
     assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
   }
 
   @Test
   public void testGetInputStreamWithDefaultDefaultSerde() {
-    // default default serde == user hasn't provided a default serde
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+    String streamId = "test-stream-1";
 
-    MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream("test-stream-1");
+    // default default serde == user hasn't provided a default serde
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
+    MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream(streamId);
 
     InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
         (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), inputOpSpec);
-    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
+    assertEquals(streamId, inputOpSpec.getStreamId());
     assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
     assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde);
   }
 
   @Test
   public void testGetInputStreamWithRelaxedTypes() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
-    MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream("test-stream-1");
+    MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream(streamId);
 
     InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
         (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), inputOpSpec);
-    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
+    assertEquals(streamId, inputOpSpec.getStreamId());
   }
 
   @Test
   public void testMultipleGetInputStreams() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec1 = mock(StreamSpec.class);
-    StreamSpec mockStreamSpec2 = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec1);
-    when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(mockStreamSpec2);
+    String streamId1 = "test-stream-1";
+    String streamId2 = "test-stream-2";
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
-    MessageStream<Object> inputStream1 = graphSpec.getInputStream("test-stream-1");
-    MessageStream<Object> inputStream2 = graphSpec.getInputStream("test-stream-2");
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
+    MessageStream<Object> inputStream1 = graphSpec.getInputStream(streamId1);
+    MessageStream<Object> inputStream2 = graphSpec.getInputStream(streamId2);
 
     InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec1 =
         (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream1).getOperatorSpec();
@@ -200,98 +180,83 @@ public class TestStreamGraphSpec {
         (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream2).getOperatorSpec();
 
     assertEquals(graphSpec.getInputOperators().size(), 2);
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec1), inputOpSpec1);
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec2), inputOpSpec2);
+    assertEquals(graphSpec.getInputOperators().get(streamId1), inputOpSpec1);
+    assertEquals(graphSpec.getInputOperators().get(streamId2), inputOpSpec2);
   }
 
   @Test(expected = IllegalStateException.class)
   public void testGetSameInputStreamTwice() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
-    graphSpec.getInputStream("test-stream-1");
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
+    graphSpec.getInputStream(streamId);
     // should throw exception
-    graphSpec.getInputStream("test-stream-1");
+    graphSpec.getInputStream(streamId);
   }
 
   @Test
   public void testGetOutputStreamWithValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
     Serde mockValueSerde = mock(Serde.class);
     OutputStream<TestMessageEnvelope> outputStream =
-        graphSpec.getOutputStream("test-stream-1", mockValueSerde);
+        graphSpec.getOutputStream(streamId, mockValueSerde);
 
     OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
-    assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
-    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl);
+    assertEquals(streamId, outputStreamImpl.getStreamId());
     assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
     assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
   }
 
   @Test
   public void testGetOutputStreamWithKeyValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
     KVSerde mockKVSerde = mock(KVSerde.class);
     Serde mockKeySerde = mock(Serde.class);
     Serde mockValueSerde = mock(Serde.class);
     doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
     doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
     graphSpec.setDefaultSerde(mockKVSerde);
-    OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream("test-stream-1", mockKVSerde);
+    OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream(streamId, mockKVSerde);
 
     OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
-    assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
-    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl);
+    assertEquals(streamId, outputStreamImpl.getStreamId());
     assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
     assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
   }
 
   @Test(expected = NullPointerException.class)
   public void testGetOutputStreamWithNullSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
-    graphSpec.getOutputStream("test-stream-1", null);
+    graphSpec.getOutputStream(streamId, null);
   }
 
   @Test
   public void testGetOutputStreamWithDefaultValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    String streamId = "test-stream-1";
 
     Serde mockValueSerde = mock(Serde.class);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
     graphSpec.setDefaultSerde(mockValueSerde);
-    OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream("test-stream-1");
+    OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream(streamId);
 
     OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
-    assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
-    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl);
+    assertEquals(streamId, outputStreamImpl.getStreamId());
     assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
     assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
   }
 
   @Test
   public void testGetOutputStreamWithDefaultKeyValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    String streamId = "test-stream-1";
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
     KVSerde mockKVSerde = mock(KVSerde.class);
     Serde mockKeySerde = mock(Serde.class);
     Serde mockValueSerde = mock(Serde.class);
@@ -299,89 +264,75 @@ public class TestStreamGraphSpec {
     doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
     graphSpec.setDefaultSerde(mockKVSerde);
 
-    OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream("test-stream-1");
+    OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream(streamId);
 
     OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
-    assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
-    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl);
+    assertEquals(streamId, outputStreamImpl.getStreamId());
     assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
     assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
   }
 
   @Test
   public void testGetOutputStreamWithDefaultDefaultSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    String streamId = "test-stream-1";
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
-    OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream("test-stream-1");
+    OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream(streamId);
 
     OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
-    assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
-    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl);
+    assertEquals(streamId, outputStreamImpl.getStreamId());
     assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
     assertTrue(outputStreamImpl.getValueSerde() instanceof NoOpSerde);
   }
 
   @Test(expected = IllegalStateException.class)
   public void testSetDefaultSerdeAfterGettingStreams() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
+    String streamId = "test-stream-1";
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
-    graphSpec.getInputStream("test-stream-1");
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
+    graphSpec.getInputStream(streamId);
     graphSpec.setDefaultSerde(mock(Serde.class)); // should throw exception
   }
 
   @Test(expected = IllegalStateException.class)
   public void testSetDefaultSerdeAfterGettingOutputStream() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
-    graphSpec.getOutputStream("test-stream-1");
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
+    graphSpec.getOutputStream(streamId);
     graphSpec.setDefaultSerde(mock(Serde.class)); // should throw exception
   }
 
   @Test(expected = IllegalStateException.class)
   public void testSetDefaultSerdeAfterGettingIntermediateStream() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
-    graphSpec.getIntermediateStream("test-stream-1", null);
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
+    graphSpec.getIntermediateStream(streamId, null);
     graphSpec.setDefaultSerde(mock(Serde.class)); // should throw exception
   }
 
   @Test(expected = IllegalStateException.class)
   public void testGetSameOutputStreamTwice() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
-    graphSpec.getOutputStream("test-stream-1");
-    graphSpec.getOutputStream("test-stream-1"); // should throw exception
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
+    graphSpec.getOutputStream(streamId);
+    graphSpec.getOutputStream(streamId); // should throw exception
   }
 
   @Test
   public void testGetIntermediateStreamWithValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    String mockStreamName = "mockStreamName";
-    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    String streamId = "stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
     Serde mockValueSerde = mock(Serde.class);
     IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graphSpec.getIntermediateStream(mockStreamName, mockValueSerde);
+        graphSpec.getIntermediateStream(streamId, mockValueSerde);
 
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
-    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graphSpec.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
+    assertEquals(streamId, intermediateStreamImpl.getStreamId());
     assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
     assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
     assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
@@ -390,13 +341,8 @@ public class TestStreamGraphSpec {
 
   @Test
   public void testGetIntermediateStreamWithKeyValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    String mockStreamName = "mockStreamName";
-    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    String streamId = "streamId";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
     KVSerde mockKVSerde = mock(KVSerde.class);
     Serde mockKeySerde = mock(Serde.class);
@@ -404,11 +350,11 @@ public class TestStreamGraphSpec {
     doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
     doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
     IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graphSpec.getIntermediateStream(mockStreamName, mockKVSerde);
+        graphSpec.getIntermediateStream(streamId, mockKVSerde);
 
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
-    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graphSpec.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
+    assertEquals(streamId, intermediateStreamImpl.getStreamId());
     assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
     assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
     assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
@@ -417,22 +363,17 @@ public class TestStreamGraphSpec {
 
   @Test
   public void testGetIntermediateStreamWithDefaultValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    String mockStreamName = "mockStreamName";
-    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
-    StreamGraphSpec graph = new StreamGraphSpec(mockRunner, mockConfig);
+    String streamId = "streamId";
+    StreamGraphSpec graph = new StreamGraphSpec(mock(Config.class));
 
     Serde mockValueSerde = mock(Serde.class);
     graph.setDefaultSerde(mockValueSerde);
     IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graph.getIntermediateStream(mockStreamName, null);
+        graph.getIntermediateStream(streamId, null);
 
-    assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
-    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertEquals(graph.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graph.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
+    assertEquals(streamId, intermediateStreamImpl.getStreamId());
     assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
     assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
     assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
@@ -441,13 +382,10 @@ public class TestStreamGraphSpec {
 
   @Test
   public void testGetIntermediateStreamWithDefaultKeyValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    String mockStreamName = "mockStreamName";
-    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    String streamId = "streamId";
+    
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
 
     KVSerde mockKVSerde = mock(KVSerde.class);
     Serde mockKeySerde = mock(Serde.class);
@@ -456,11 +394,11 @@ public class TestStreamGraphSpec {
     doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
     graphSpec.setDefaultSerde(mockKVSerde);
     IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graphSpec.getIntermediateStream(mockStreamName, null);
+        graphSpec.getIntermediateStream(streamId, null);
 
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
-    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graphSpec.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
+    assertEquals(streamId, intermediateStreamImpl.getStreamId());
     assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
     assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
     assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
@@ -469,19 +407,16 @@ public class TestStreamGraphSpec {
 
   @Test
   public void testGetIntermediateStreamWithDefaultDefaultSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    String mockStreamName = "mockStreamName";
-    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    String streamId = "streamId";
+    
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
     IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graphSpec.getIntermediateStream(mockStreamName, null);
+        graphSpec.getIntermediateStream(streamId, null);
 
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
-    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graphSpec.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
+    assertEquals(streamId, intermediateStreamImpl.getStreamId());
     assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
     assertTrue(intermediateStreamImpl.getOutputStream().getValueSerde() instanceof NoOpSerde);
     assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
@@ -490,22 +425,18 @@ public class TestStreamGraphSpec {
 
   @Test(expected = IllegalStateException.class)
   public void testGetSameIntermediateStreamTwice() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
     graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class));
     graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class));
   }
 
   @Test
   public void testGetNextOpIdIncrementsId() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
     when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
     when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
     assertEquals("jobName-1234-merge-0", graphSpec.getNextOpId(OpCode.MERGE, null));
     assertEquals("jobName-1234-join-customName", graphSpec.getNextOpId(OpCode.JOIN, "customName"));
     assertEquals("jobName-1234-map-2", graphSpec.getNextOpId(OpCode.MAP, null));
@@ -513,24 +444,22 @@ public class TestStreamGraphSpec {
 
   @Test(expected = SamzaException.class)
   public void testGetNextOpIdRejectsDuplicates() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
     when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
     when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
     assertEquals("jobName-1234-join-customName", graphSpec.getNextOpId(OpCode.JOIN, "customName"));
     graphSpec.getNextOpId(OpCode.JOIN, "customName"); // should throw
   }
 
   @Test
   public void testUserDefinedIdValidation() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
     when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
     when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
 
     // null and empty userDefinedIDs should fall back to autogenerated IDs.
     try {
@@ -562,36 +491,29 @@ public class TestStreamGraphSpec {
 
   @Test
   public void testGetInputStreamPreservesInsertionOrder() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
-
-    StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
-
-    StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system");
-    when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
-
-    StreamSpec testStreamSpec3 = new StreamSpec("test-stream-3", "physical-stream-3", "test-system");
-    when(mockRunner.getStreamSpec("test-stream-3")).thenReturn(testStreamSpec3);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
 
+    String testStreamId1 = "test-stream-1";
+    String testStreamId2 = "test-stream-2";
+    String testStreamId3 = "test-stream-3";
+    
     graphSpec.getInputStream("test-stream-1");
     graphSpec.getInputStream("test-stream-2");
     graphSpec.getInputStream("test-stream-3");
 
     List<InputOperatorSpec> inputSpecs = new ArrayList<>(graphSpec.getInputOperators().values());
     assertEquals(inputSpecs.size(), 3);
-    assertEquals(inputSpecs.get(0).getStreamSpec(), testStreamSpec1);
-    assertEquals(inputSpecs.get(1).getStreamSpec(), testStreamSpec2);
-    assertEquals(inputSpecs.get(2).getStreamSpec(), testStreamSpec3);
+    assertEquals(inputSpecs.get(0).getStreamId(), testStreamId1);
+    assertEquals(inputSpecs.get(1).getStreamId(), testStreamId2);
+    assertEquals(inputSpecs.get(2).getStreamId(), testStreamId3);
   }
 
   @Test
   public void testGetTable() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
 
     BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
     when(mockTableDescriptor.getTableSpec()).thenReturn(
index 6fdcacc..4e77fae 100644 (file)
@@ -21,6 +21,7 @@ package org.apache.samza.operators.impl;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+
 import java.io.Serializable;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -28,6 +29,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.BiFunction;
@@ -36,6 +38,7 @@ import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
 import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.container.TaskName;
@@ -45,8 +48,8 @@ import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.functions.ClosableFunction;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.InitableFunction;
@@ -54,20 +57,18 @@ import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.util.TimestampedValue;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
-import java.util.List;
 import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.testUtils.StreamTestUtils;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.SystemClock;
 import org.junit.After;
@@ -76,7 +77,6 @@ import org.junit.Test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -217,7 +217,7 @@ public class TestOperatorImplGraph {
 
   @Test
   public void testEmptyChain() {
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(ApplicationRunner.class), mock(Config.class));
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
     OperatorImplGraph opGraph =
         new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mock(TaskContextImpl.class), mock(Clock.class));
     assertEquals(0, opGraph.getAllInputOperators().size());
@@ -225,13 +225,26 @@ public class TestOperatorImplGraph {
 
   @Test
   public void testLinearChain() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
-    when(mockRunner.getStreamSpec(eq("output"))).thenReturn(mock(StreamSpec.class));
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
-
-    MessageStream<Object> inputStream = graphSpec.getInputStream("input");
-    OutputStream<Object> outputStream = graphSpec.getOutputStream("output");
+    String inputStreamId = "input";
+    String inputSystem = "input-system";
+    String inputPhysicalName = "input-stream";
+    String outputStreamId = "output";
+    String outputSystem = "output-system";
+    String outputPhysicalName = "output-stream";
+    String intermediateSystem = "intermediate-system";
+
+    HashMap<String, String> configs = new HashMap<>();
+    configs.put(JobConfig.JOB_NAME(), "jobName");
+    configs.put(JobConfig.JOB_ID(), "jobId");
+    configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), intermediateSystem);
+    StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, inputPhysicalName);
+    StreamTestUtils.addStreamConfigs(configs, outputStreamId, outputSystem, outputPhysicalName);
+    Config config = new MapConfig(configs);
+
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
+
+    MessageStream<Object> inputStream = graphSpec.getInputStream(inputStreamId);
+    OutputStream<Object> outputStream = graphSpec.getOutputStream(outputStreamId);
 
     inputStream
         .filter(mock(FilterFunction.class))
@@ -242,9 +255,9 @@ public class TestOperatorImplGraph {
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     when(mockTaskContext.getTaskName()).thenReturn(new TaskName("task 0"));
     OperatorImplGraph opImplGraph =
-        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mockTaskContext, mock(Clock.class));
+        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class));
 
-    InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
+    InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream(inputSystem, inputPhysicalName));
     assertEquals(1, inputOpImpl.registeredOperators.size());
 
     OperatorImpl filterOpImpl = (StreamOperatorImpl) inputOpImpl.registeredOperators.iterator().next();
@@ -262,18 +275,27 @@ public class TestOperatorImplGraph {
 
   @Test
   public void testPartitionByChain() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
-    when(mockRunner.getStreamSpec(eq("output"))).thenReturn(new StreamSpec("output", "output-stream", "output-system"));
-    when(mockRunner.getStreamSpec(eq("jobName-jobId-partition_by-p1")))
-        .thenReturn(new StreamSpec("intermediate", "intermediate-stream", "intermediate-system"));
-    Config mockConfig = mock(Config.class);
-    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
-    when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
-    MessageStream<Object> inputStream = graphSpec.getInputStream("input");
+    String inputStreamId = "input";
+    String inputSystem = "input-system";
+    String inputPhysicalName = "input-stream";
+    String outputStreamId = "output";
+    String outputSystem = "output-system";
+    String outputPhysicalName = "output-stream";
+    String intermediateStreamId = "jobName-jobId-partition_by-p1";
+    String intermediateSystem = "intermediate-system";
+
+    HashMap<String, String> configs = new HashMap<>();
+    configs.put(JobConfig.JOB_NAME(), "jobName");
+    configs.put(JobConfig.JOB_ID(), "jobId");
+    configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), intermediateSystem);
+    StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, inputPhysicalName);
+    StreamTestUtils.addStreamConfigs(configs, outputStreamId, outputSystem, outputPhysicalName);
+    Config config = new MapConfig(configs);
+
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
+    MessageStream<Object> inputStream = graphSpec.getInputStream(inputStreamId);
     OutputStream<KV<Integer, String>> outputStream = graphSpec
-        .getOutputStream("output", KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)));
+        .getOutputStream(outputStreamId, KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)));
 
     inputStream
         .partitionBy(Object::hashCode, Object::toString,
@@ -291,12 +313,12 @@ public class TestOperatorImplGraph {
     when(taskModel.getSystemStreamPartitions()).thenReturn(Collections.emptySet());
     when(mockTaskContext.getJobModel()).thenReturn(jobModel);
     SamzaContainerContext containerContext =
-        new SamzaContainerContext("0", mockConfig, Collections.singleton(new TaskName("task 0")), new MetricsRegistryMap());
+        new SamzaContainerContext("0", config, Collections.singleton(new TaskName("task 0")), new MetricsRegistryMap());
     when(mockTaskContext.getSamzaContainerContext()).thenReturn(containerContext);
     OperatorImplGraph opImplGraph =
-        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mockConfig, mockTaskContext, mock(Clock.class));
+        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class));
 
-    InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
+    InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream(inputSystem, inputPhysicalName));
     assertEquals(1, inputOpImpl.registeredOperators.size());
 
     OperatorImpl partitionByOpImpl = (PartitionByOperatorImpl) inputOpImpl.registeredOperators.iterator().next();
@@ -304,7 +326,7 @@ public class TestOperatorImplGraph {
     assertEquals(OpCode.PARTITION_BY, partitionByOpImpl.getOperatorSpec().getOpCode());
 
     InputOperatorImpl repartitionedInputOpImpl =
-        opImplGraph.getInputOperator(new SystemStream("intermediate-system", "intermediate-stream"));
+        opImplGraph.getInputOperator(new SystemStream(intermediateSystem, intermediateStreamId));
     assertEquals(1, repartitionedInputOpImpl.registeredOperators.size());
 
     OperatorImpl sendToOpImpl = (OutputOperatorImpl) repartitionedInputOpImpl.registeredOperators.iterator().next();
@@ -314,18 +336,20 @@ public class TestOperatorImplGraph {
 
   @Test
   public void testBroadcastChain() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+    String inputStreamId = "input";
+    HashMap<String, String> configMap = new HashMap<>();
+    StreamTestUtils.addStreamConfigs(configMap, "input", "input-system", "input-stream");
+    Config config = new MapConfig(configMap);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
 
-    MessageStream<Object> inputStream = graphSpec.getInputStream("input");
+    MessageStream<Object> inputStream = graphSpec.getInputStream(inputStreamId);
     inputStream.filter(mock(FilterFunction.class));
     inputStream.map(mock(MapFunction.class));
 
     TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     OperatorImplGraph opImplGraph =
-        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mockTaskContext, mock(Clock.class));
+        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class));
 
     InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
     assertEquals(2, inputOpImpl.registeredOperators.size());
@@ -337,12 +361,10 @@ public class TestOperatorImplGraph {
 
   @Test
   public void testMergeChain() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    when(mockRunner.getStreamSpec(eq("input")))
-        .thenReturn(new StreamSpec("input", "input-stream", "input-system"));
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+    String inputStreamId = "input";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
-    MessageStream<Object> inputStream = graphSpec.getInputStream("input");
+    MessageStream<Object> inputStream = graphSpec.getInputStream(inputStreamId);
     MessageStream<Object> stream1 = inputStream.filter(mock(FilterFunction.class));
     MessageStream<Object> stream2 = inputStream.map(mock(MapFunction.class));
     MessageStream<Object> mergedStream = stream1.merge(Collections.singleton(stream2));
@@ -372,20 +394,23 @@ public class TestOperatorImplGraph {
 
   @Test
   public void testJoinChain() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    when(mockRunner.getStreamSpec(eq("input1"))).thenReturn(new StreamSpec("input1", "input-stream1", "input-system"));
-    when(mockRunner.getStreamSpec(eq("input2"))).thenReturn(new StreamSpec("input2", "input-stream2", "input-system"));
-    Config mockConfig = mock(Config.class);
-    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
-    when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    String inputStreamId1 = "input1";
+    String inputStreamId2 = "input2";
+
+    HashMap<String, String> configs = new HashMap<>();
+    configs.put(JobConfig.JOB_NAME(), "jobName");
+    configs.put(JobConfig.JOB_ID(), "jobId");
+    StreamTestUtils.addStreamConfigs(configs, "input1", "input-system", "input-stream1");
+    StreamTestUtils.addStreamConfigs(configs, "input2", "input-system", "input-stream2");
+    Config config = new MapConfig(configs);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
 
     Integer joinKey = new Integer(1);
     Function<Object, Integer> keyFn = (Function & Serializable) m -> joinKey;
     JoinFunction testJoinFunction = new TestJoinFunction("jobName-jobId-join-j1",
         (BiFunction & Serializable) (m1, m2) -> KV.of(m1, m2), keyFn, keyFn);
-    MessageStream<Object> inputStream1 = graphSpec.getInputStream("input1", new NoOpSerde<>());
-    MessageStream<Object> inputStream2 = graphSpec.getInputStream("input2", new NoOpSerde<>());
+    MessageStream<Object> inputStream1 = graphSpec.getInputStream(inputStreamId1, new NoOpSerde<>());
+    MessageStream<Object> inputStream2 = graphSpec.getInputStream(inputStreamId2, new NoOpSerde<>());
     inputStream1.join(inputStream2, testJoinFunction,
         mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j1");
 
@@ -398,7 +423,7 @@ public class TestOperatorImplGraph {
     KeyValueStore mockRightStore = mock(KeyValueStore.class);
     when(mockTaskContext.getStore(eq("jobName-jobId-join-j1-R"))).thenReturn(mockRightStore);
     OperatorImplGraph opImplGraph =
-        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mockConfig, mockTaskContext, mock(Clock.class));
+        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class));
 
     // verify that join function is initialized once.
     assertEquals(TestJoinFunction.getInstanceByTaskName(mockTaskName, "jobName-jobId-join-j1").numInitCalled, 1);
@@ -434,18 +459,17 @@ public class TestOperatorImplGraph {
 
   @Test
   public void testOperatorGraphInitAndClose() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    when(mockRunner.getStreamSpec("input1")).thenReturn(new StreamSpec("input1", "input-stream1", "input-system"));
-    when(mockRunner.getStreamSpec("input2")).thenReturn(new StreamSpec("input2", "input-stream2", "input-system"));
+    String inputStreamId1 = "input1";
+    String inputStreamId2 = "input2";
     Config mockConfig = mock(Config.class);
     TaskName mockTaskName = mock(TaskName.class);
     TaskContextImpl mockContext = mock(TaskContextImpl.class);
     when(mockContext.getTaskName()).thenReturn(mockTaskName);
     when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
 
-    MessageStream<Object> inputStream1 = graphSpec.getInputStream("input1");
-    MessageStream<Object> inputStream2 = graphSpec.getInputStream("input2");
+    MessageStream<Object> inputStream1 = graphSpec.getInputStream(inputStreamId1);
+    MessageStream<Object> inputStream2 = graphSpec.getInputStream(inputStreamId2);
 
     Function mapFn = (Function & Serializable) m -> m;
     inputStream1.map(new TestMapFunction<Object, Object>("1", mapFn))
@@ -476,12 +500,19 @@ public class TestOperatorImplGraph {
   @Test
   public void testGetStreamToConsumerTasks() {
     String system = "test-system";
-    String stream0 = "test-stream-0";
-    String stream1 = "test-stream-1";
+    String streamId0 = "test-stream-0";
+    String streamId1 = "test-stream-1";
+
+    HashMap<String, String> configs = new HashMap<>();
+    configs.put(JobConfig.JOB_NAME(), "test-app");
+    configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system");
+    StreamTestUtils.addStreamConfigs(configs, streamId0, system, streamId0);
+    StreamTestUtils.addStreamConfigs(configs, streamId1, system, streamId1);
+    Config config = new MapConfig(configs);
 
-    SystemStreamPartition ssp0 = new SystemStreamPartition(system, stream0, new Partition(0));
-    SystemStreamPartition ssp1 = new SystemStreamPartition(system, stream0, new Partition(1));
-    SystemStreamPartition ssp2 = new SystemStreamPartition(system, stream1, new Partition(0));
+    SystemStreamPartition ssp0 = new SystemStreamPartition(system, streamId0, new Partition(0));
+    SystemStreamPartition ssp1 = new SystemStreamPartition(system, streamId0, new Partition(1));
+    SystemStreamPartition ssp2 = new SystemStreamPartition(system, streamId1, new Partition(0));
 
     TaskName task0 = new TaskName("Task 0");
     TaskName task1 = new TaskName("Task 1");
@@ -497,7 +528,7 @@ public class TestOperatorImplGraph {
     cms.put(cm0.getProcessorId(), cm0);
     cms.put(cm1.getProcessorId(), cm1);
 
-    JobModel jobModel = new JobModel(new MapConfig(), cms, null);
+    JobModel jobModel = new JobModel(config, cms, null);
     Multimap<SystemStream, String> streamToTasks = OperatorImplGraph.getStreamToConsumerTasks(jobModel);
     assertEquals(streamToTasks.get(ssp0.getSystemStream()).size(), 2);
     assertEquals(streamToTasks.get(ssp2.getSystemStream()).size(), 1);
@@ -505,56 +536,44 @@ public class TestOperatorImplGraph {
 
   @Test
   public void testGetOutputToInputStreams() {
-    Map<String, String> configMap = new HashMap<>();
-    configMap.put(JobConfig.JOB_NAME(), "test-app");
-    configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system");
-    Config config = new MapConfig(configMap);
-
-    /**
-     * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value.
-     *
-     *                                    input1 -> map -> join -> partitionBy (10) -> output1
-     *                                                       |
-     *                                     input2 -> filter -|
-     *                                                       |
-     *           input3 -> filter -> partitionBy -> map -> join -> output2
-     *
-     */
-    StreamSpec input1 = new StreamSpec("input1", "input1", "system1");
-    StreamSpec input2 = new StreamSpec("input2", "input2", "system2");
-    StreamSpec input3 = new StreamSpec("input3", "input3", "system2");
-
-    StreamSpec output1 = new StreamSpec("output1", "output1", "system1");
-    StreamSpec output2 = new StreamSpec("output2", "output2", "system2");
-
-    ApplicationRunner runner = mock(ApplicationRunner.class);
-    when(runner.getStreamSpec("input1")).thenReturn(input1);
-    when(runner.getStreamSpec("input2")).thenReturn(input2);
-    when(runner.getStreamSpec("input3")).thenReturn(input3);
-    when(runner.getStreamSpec("output1")).thenReturn(output1);
-    when(runner.getStreamSpec("output2")).thenReturn(output2);
-
-    // intermediate streams used in tests
-    StreamSpec int1 = new StreamSpec("test-app-1-partition_by-p2", "test-app-1-partition_by-p2", "default-system");
-    StreamSpec int2 = new StreamSpec("test-app-1-partition_by-p1", "test-app-1-partition_by-p1", "default-system");
-    when(runner.getStreamSpec("test-app-1-partition_by-p2")).thenReturn(int1);
-    when(runner.getStreamSpec("test-app-1-partition_by-p1")).thenReturn(int2);
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
-    MessageStream messageStream1 = graphSpec.getInputStream("input1").map(m -> m);
-    MessageStream messageStream2 = graphSpec.getInputStream("input2").filter(m -> true);
+    String inputStreamId1 = "input1";
+    String inputStreamId2 = "input2";
+    String inputStreamId3 = "input3";
+    String inputSystem = "input-system";
+
+    String outputStreamId1 = "output1";
+    String outputStreamId2 = "output2";
+    String outputSystem = "output-system";
+
+    String intStreamId1 = "test-app-1-partition_by-p1";
+    String intStreamId2 = "test-app-1-partition_by-p2";
+    String intSystem = "test-system";
+
+    HashMap<String, String> configs = new HashMap<>();
+    configs.put(JobConfig.JOB_NAME(), "test-app");
+    configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), intSystem);
+    StreamTestUtils.addStreamConfigs(configs, inputStreamId1, inputSystem, inputStreamId1);
+    StreamTestUtils.addStreamConfigs(configs, inputStreamId2, inputSystem, inputStreamId2);
+    StreamTestUtils.addStreamConfigs(configs, inputStreamId3, inputSystem, inputStreamId3);
+    StreamTestUtils.addStreamConfigs(configs, outputStreamId1, outputSystem, outputStreamId1);
+    StreamTestUtils.addStreamConfigs(configs, outputStreamId2, outputSystem, outputStreamId2);
+    Config config = new MapConfig(configs);
+
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
+    MessageStream messageStream1 = graphSpec.getInputStream(inputStreamId1).map(m -> m);
+    MessageStream messageStream2 = graphSpec.getInputStream(inputStreamId2).filter(m -> true);
     MessageStream messageStream3 =
-        graphSpec.getInputStream("input3")
+        graphSpec.getInputStream(inputStreamId3)
             .filter(m -> true)
-            .partitionBy(m -> "hehe", m -> m, "p1")
+            .partitionBy(m -> "m", m -> m, "p1")
             .map(m -> m);
-    OutputStream<Object> outputStream1 = graphSpec.getOutputStream("output1");
-    OutputStream<Object> outputStream2 = graphSpec.getOutputStream("output2");
+    OutputStream<Object> outputStream1 = graphSpec.getOutputStream(outputStreamId1);
+    OutputStream<Object> outputStream2 = graphSpec.getOutputStream(outputStreamId2);
 
     messageStream1
         .join(messageStream2, mock(JoinFunction.class),
             mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
-        .partitionBy(m -> "haha", m -> m, "p2")
+        .partitionBy(m -> "m", m -> m, "p2")
         .sendTo(outputStream1);
     messageStream3
         .join(messageStream2, mock(JoinFunction.class),
@@ -562,19 +581,41 @@ public class TestOperatorImplGraph {
         .sendTo(outputStream2);
 
     Multimap<SystemStream, SystemStream> outputToInput =
-        OperatorImplGraph.getIntermediateToInputStreamsMap(graphSpec.getOperatorSpecGraph());
-    Collection<SystemStream> inputs = outputToInput.get(int1.toSystemStream());
+        OperatorImplGraph.getIntermediateToInputStreamsMap(graphSpec.getOperatorSpecGraph(), new StreamConfig(config));
+    Collection<SystemStream> inputs = outputToInput.get(new SystemStream(intSystem, intStreamId2));
     assertEquals(inputs.size(), 2);
-    assertTrue(inputs.contains(input1.toSystemStream()));
-    assertTrue(inputs.contains(input2.toSystemStream()));
+    assertTrue(inputs.contains(new SystemStream(inputSystem, inputStreamId1)));
+    assertTrue(inputs.contains(new SystemStream(inputSystem, inputStreamId2)));
 
-    inputs = outputToInput.get(int2.toSystemStream());
+    inputs = outputToInput.get(new SystemStream(intSystem, intStreamId1));
     assertEquals(inputs.size(), 1);
-    assertEquals(inputs.iterator().next(), input3.toSystemStream());
+    assertEquals(inputs.iterator().next(), new SystemStream(inputSystem, inputStreamId3));
   }
 
   @Test
   public void testGetProducerTaskCountForIntermediateStreams() {
+    String inputStreamId1 = "input1";
+    String inputStreamId2 = "input2";
+    String inputStreamId3 = "input3";
+    String inputSystem1 = "system1";
+    String inputSystem2 = "system2";
+
+    HashMap<String, String> configs = new HashMap<>();
+    configs.put(JobConfig.JOB_NAME(), "test-app");
+    configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), inputSystem1);
+    StreamTestUtils.addStreamConfigs(configs, inputStreamId1, inputSystem1, inputStreamId1);
+    StreamTestUtils.addStreamConfigs(configs, inputStreamId2, inputSystem2, inputStreamId2);
+    StreamTestUtils.addStreamConfigs(configs, inputStreamId3, inputSystem2, inputStreamId3);
+    Config config = new MapConfig(configs);
+
+    SystemStream input1 = new SystemStream("system1", "intput1");
+    SystemStream input2 = new SystemStream("system2", "intput2");
+    SystemStream input3 = new SystemStream("system2", "intput3");
+
+    SystemStream int1 = new SystemStream("system1", "int1");
+    SystemStream int2 = new SystemStream("system1", "int2");
+
+
     /**
      * the task assignment looks like the following:
      *
@@ -585,15 +626,6 @@ public class TestOperatorImplGraph {
      * input3 ------> task1 -----------> int2
      *
      */
-
-    SystemStream input1 = new SystemStream("system1", "intput1");
-    SystemStream input2 = new SystemStream("system2", "intput2");
-    SystemStream input3 = new SystemStream("system2", "intput3");
-
-    SystemStream int1 = new SystemStream("system1", "int1");
-    SystemStream int2 = new SystemStream("system1", "int2");
-
-
     String task0 = "Task 0";
     String task1 = "Task 1";
     String task2 = "Task 2";
index 9741fc4..0ef6680 100644 (file)
@@ -42,13 +42,11 @@ import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.AccumulationMode;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
@@ -80,7 +78,6 @@ public class TestWindowOperator {
   private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3);
   private Config config;
   private TaskContextImpl taskContext;
-  private ApplicationRunner runner;
 
   @Before
   public void setup() throws Exception {
@@ -88,19 +85,16 @@ public class TestWindowOperator {
     when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName");
     when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
     taskContext = mock(TaskContextImpl.class);
-    runner = mock(ApplicationRunner.class);
     Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
     Serde storeValSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
 
     when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
-        .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
+        .of(new SystemStreamPartition("kafka", "integTestExecutionPlannerers", new Partition(0))));
     when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     when(taskContext.getStore("jobName-jobId-window-w1"))
         .thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde));
-    when(runner.getStreamSpec("integers")).thenReturn(new StreamSpec("integers", "integers", "kafka"));
 
     Map<String, String> mapConfig = new HashMap<>();
-    mapConfig.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner");
     mapConfig.put("job.default.system", "kafka");
     mapConfig.put("job.name", "jobName");
     mapConfig.put("job.id", "jobId");
@@ -552,7 +546,7 @@ public class TestWindowOperator {
 
   private StreamGraphSpec getKeyedTumblingWindowStreamGraph(AccumulationMode mode,
       Duration duration, Trigger<KV<Integer, Integer>> earlyTrigger) throws IOException {
-    StreamGraphSpec graph = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graph = new StreamGraphSpec(config);
 
     KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
     graph.getInputStream("integers", kvSerde)
@@ -568,7 +562,7 @@ public class TestWindowOperator {
 
   private StreamGraphSpec getTumblingWindowStreamGraph(AccumulationMode mode,
       Duration duration, Trigger<KV<Integer, Integer>> earlyTrigger) throws IOException {
-    StreamGraphSpec graph = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graph = new StreamGraphSpec(config);
 
     KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
     graph.getInputStream("integers", kvSerde)
@@ -582,7 +576,7 @@ public class TestWindowOperator {
   }
 
   private StreamGraphSpec getKeyedSessionWindowStreamGraph(AccumulationMode mode, Duration duration) throws IOException {
-    StreamGraphSpec graph = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graph = new StreamGraphSpec(config);
 
     KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
     graph.getInputStream("integers", kvSerde)
@@ -597,7 +591,7 @@ public class TestWindowOperator {
 
   private StreamGraphSpec getAggregateTumblingWindowStreamGraph(AccumulationMode mode, Duration timeDuration,
         Trigger<IntegerEnvelope> earlyTrigger) throws IOException {
-    StreamGraphSpec graph = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graph = new StreamGraphSpec(config);
 
     MessageStream<KV<Integer, Integer>> integers = graph.getInputStream("integers",
         KVSerde.of(new IntegerSerde(), new IntegerSerde()));
index b39b0d0..7704a5b 100644 (file)
@@ -31,7 +31,6 @@ import org.apache.samza.operators.TableImpl;
 import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.SerializableSerde;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.table.TableSpec;
 
 import static org.junit.Assert.*;
@@ -105,8 +104,8 @@ public class OperatorSpecTestUtils {
     assertEquals(oTable.getTableSpec(), nTable.getTableSpec());
   }
 
-  private static void assertClonedOutputs(Map<StreamSpec, OutputStreamImpl> originalOutputs,
-      Map<StreamSpec, OutputStreamImpl> clonedOutputs) {
+  private static void assertClonedOutputs(Map<String, OutputStreamImpl> originalOutputs,
+      Map<String, OutputStreamImpl> clonedOutputs) {
     assertEquals(originalOutputs.size(), clonedOutputs.size());
     assertEquals(originalOutputs.keySet(), clonedOutputs.keySet());
     Iterator<OutputStreamImpl> oIter = originalOutputs.values().iterator();
@@ -117,12 +116,11 @@ public class OperatorSpecTestUtils {
   private static void assertClonedOutputImpl(OutputStreamImpl oOutput, OutputStreamImpl nOutput) {
     assertNotEquals(oOutput, nOutput);
     assertEquals(oOutput.isKeyed(), nOutput.isKeyed());
-    assertEquals(oOutput.getSystemStream(), nOutput.getSystemStream());
-    assertEquals(oOutput.getStreamSpec(), nOutput.getStreamSpec());
+    assertEquals(oOutput.getStreamId(), nOutput.getStreamId());
   }
 
-  private static void assertClonedInputs(Map<StreamSpec, InputOperatorSpec> originalInputs,
-      Map<StreamSpec, InputOperatorSpec> clonedInputs) {
+  private static void assertClonedInputs(Map<String, InputOperatorSpec> originalInputs,
+      Map<String, InputOperatorSpec> clonedInputs) {
     assertEquals(originalInputs.size(), clonedInputs.size());
     assertEquals(originalInputs.keySet(), clonedInputs.keySet());
     Iterator<InputOperatorSpec> oIter = originalInputs.values().iterator();
@@ -134,7 +132,7 @@ public class OperatorSpecTestUtils {
     assertNotEquals(originalInput, clonedInput);
     assertEquals(originalInput.getOpId(), clonedInput.getOpId());
     assertEquals(originalInput.getOpCode(), clonedInput.getOpCode());
-    assertEquals(originalInput.getStreamSpec(), clonedInput.getStreamSpec());
+    assertEquals(originalInput.getStreamId(), clonedInput.getStreamId());
     assertEquals(originalInput.isKeyed(), clonedInput.isKeyed());
   }
 
index cb221b0..b27c944 100644 (file)
@@ -40,7 +40,6 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.table.TableSpec;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
@@ -50,7 +49,6 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
 
 
 /**
@@ -230,9 +228,8 @@ public class TestOperatorSpec {
       }
     };
 
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
     InputOperatorSpec<String, Object> inputOperatorSpec = new InputOperatorSpec<>(
-        mockStreamSpec, new StringSerde("UTF-8"), objSerde, true, "op0");
+        "mockStreamId", new StringSerde("UTF-8"), objSerde, true, "op0");
     InputOperatorSpec<String, Object> inputOpCopy = (InputOperatorSpec<String, Object>) OperatorSpecTestUtils.copyOpSpec(inputOperatorSpec);
 
     assertNotEquals("Expected deserialized copy of operator spec should not be the same as the original operator spec", inputOperatorSpec, inputOpCopy);
@@ -254,9 +251,8 @@ public class TestOperatorSpec {
         return new byte[0];
       }
     };
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    OutputStreamImpl<KV<String, Object>> outputStrmImpl = new OutputStreamImpl<>(mockStreamSpec, new StringSerde("UTF-8"), objSerde, true);
-    OutputOperatorSpec<KV<String, Object>> outputOperatorSpec = new OutputOperatorSpec<KV<String, Object>>(outputStrmImpl, "op0");
+    OutputStreamImpl<KV<String, Object>> outputStrmImpl = new OutputStreamImpl<>("mockStreamId", new StringSerde("UTF-8"), objSerde, true);
+    OutputOperatorSpec<KV<String, Object>> outputOperatorSpec = new OutputOperatorSpec<>(outputStrmImpl, "op0");
     OutputOperatorSpec<KV<String, Object>> outputOpCopy = (OutputOperatorSpec<KV<String, Object>>) OperatorSpecTestUtils
         .copyOpSpec(outputOperatorSpec);
     assertNotEquals("Expected deserialized copy of operator spec should not be the same as the original operator spec", outputOperatorSpec, outputOpCopy);
@@ -276,10 +272,10 @@ public class TestOperatorSpec {
   public void testJoinOperatorSpec() {
 
     InputOperatorSpec<TestMessageEnvelope, Object> leftOpSpec = new InputOperatorSpec<>(
-        new StreamSpec("test-input-1", "test-input-1", "kafka"), new NoOpSerde<>(),
+        "test-input-1", new NoOpSerde<>(),
         new NoOpSerde<>(), false, "op0");
     InputOperatorSpec<TestMessageEnvelope, Object> rightOpSpec = new InputOperatorSpec<>(
-        new StreamSpec("test-input-2", "test-input-2", "kafka"), new NoOpSerde<>(),
+        "test-input-2", new NoOpSerde<>(),
         new NoOpSerde<>(), false, "op1");
 
     Serde<Object> objSerde = new Serde<Object>() {
@@ -341,14 +337,14 @@ public class TestOperatorSpec {
   @Test
   public void testBroadcastOperatorSpec() {
     OutputStreamImpl<TestOutputMessageEnvelope> outputStream =
-        new OutputStreamImpl<>(new StreamSpec("output-0", "outputStream-0", "kafka"), new StringSerde("UTF-8"), new JsonSerdeV2<TestOutputMessageEnvelope>(), true);
+        new OutputStreamImpl<>("output-0", new StringSerde("UTF-8"), new JsonSerdeV2<TestOutputMessageEnvelope>(), true);
     BroadcastOperatorSpec<TestOutputMessageEnvelope> broadcastOpSpec = new BroadcastOperatorSpec<>(outputStream, "broadcast-1");
     BroadcastOperatorSpec<TestOutputMessageEnvelope> broadcastOpCopy = (BroadcastOperatorSpec<TestOutputMessageEnvelope>) OperatorSpecTestUtils
         .copyOpSpec(broadcastOpSpec);
     assertNotEquals(broadcastOpCopy, broadcastOpSpec);
     assertEquals(broadcastOpCopy.getOpId(), broadcastOpSpec.getOpId());
     assertTrue(broadcastOpCopy.getOutputStream() != broadcastOpSpec.getOutputStream());
-    assertEquals(broadcastOpCopy.getOutputStream().getSystemStream(), broadcastOpSpec.getOutputStream().getSystemStream());
+    assertEquals(broadcastOpCopy.getOutputStream().getStreamId(), broadcastOpSpec.getOutputStream().getStreamId());
     assertEquals(broadcastOpCopy.getOutputStream().isKeyed(), broadcastOpSpec.getOutputStream().isKeyed());
   }
 
index 00ec176..9bbcbfa 100644 (file)
@@ -29,9 +29,7 @@ import org.apache.samza.operators.TimerRegistry;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.system.StreamSpec;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
@@ -45,7 +43,6 @@ import static org.mockito.Mockito.*;
  */
 public class TestPartitionByOperatorSpec {
 
-  private final ApplicationRunner mockRunner = mock(ApplicationRunner.class);
   private final Config mockConfig = mock(Config.class);
   private final String testInputId = "test-input-1";
   private final String testJobName = "testJob";
@@ -93,12 +90,7 @@ public class TestPartitionByOperatorSpec {
   public void setup() {
     when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn(testJobName);
     when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn(testJobId);
-    StreamSpec inputSpec1 = new StreamSpec(testInputId, testInputId, "kafka");
-    when(mockRunner.getStreamSpec(testInputId)).thenReturn(inputSpec1);
-    String intermediateStreamName = String.format("%s-%s-partition_by-%s", testJobName, testJobId, testReparStreamName);
-    StreamSpec intermediateSpec1 = new StreamSpec(intermediateStreamName, intermediateStreamName, "kafka");
-    when(mockRunner.getStreamSpec(intermediateStreamName)).thenReturn(intermediateSpec1);
-    graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    graphSpec = new StreamGraphSpec(mockConfig);
   }
 
   @Test
@@ -109,7 +101,7 @@ public class TestPartitionByOperatorSpec {
     MessageStream<KV<String, Object>>
         reparStream = inputStream.partitionBy(keyFn, valueFn, testReparStreamName);
     InputOperatorSpec inputOpSpec = (InputOperatorSpec) Whitebox.getInternalState(reparStream, "operatorSpec");
-    assertEquals(inputOpSpec.getStreamSpec().getId(), String.format("%s-%s-partition_by-%s", testJobName, testJobId, testReparStreamName));
+    assertEquals(inputOpSpec.getStreamId(), String.format("%s-%s-partition_by-%s", testJobName, testJobId, testReparStreamName));
     assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
     assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde);
     assertTrue(inputOpSpec.isKeyed());
@@ -121,7 +113,7 @@ public class TestPartitionByOperatorSpec {
     assertEquals(reparOpSpec.getOpId(), String.format("%s-%s-partition_by-%s", testJobName, testJobId, testReparStreamName));
     assertEquals(reparOpSpec.getKeyFunction(), keyFn);
     assertEquals(reparOpSpec.getValueFunction(), valueFn);
-    assertEquals(reparOpSpec.getOutputStream().getStreamSpec(), new StreamSpec(reparOpSpec.getOpId(), reparOpSpec.getOpId(), "kafka"));
+    assertEquals(reparOpSpec.getOutputStream().getStreamId(), reparOpSpec.getOpId());
     assertNull(reparOpSpec.getTimerFn());
     assertNull(reparOpSpec.getWatermarkFn());
   }
index e207772..0b91315 100644 (file)
@@ -27,7 +27,6 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.StreamGraphSpec;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.testUtils.TestAsyncStreamTask;
 import org.apache.samza.testUtils.TestStreamTask;
 import org.junit.Test;
@@ -46,8 +45,6 @@ import static org.mockito.Mockito.mock;
  */
 public class TestTaskFactoryUtil {
 
-  private final ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-
   @Test
   public void testStreamTaskClass() {
     Config config = new MapConfig(new HashMap<String, String>() {
@@ -81,7 +78,7 @@ public class TestTaskFactoryUtil {
     });
     StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config);
     assertNotNull(streamApp);
-    StreamGraphSpec graph = new StreamGraphSpec(mockRunner, config);
+    StreamGraphSpec graph = new StreamGraphSpec(config);
     streamApp.init(graph, config);
     Object retFactory = TaskFactoryUtil.createTaskFactory(graph.getOperatorSpecGraph(), null);
     assertTrue(retFactory instanceof StreamTaskFactory);
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/StreamTestUtils.java b/samza-core/src/test/java/org/apache/samza/testUtils/StreamTestUtils.java
new file mode 100644 (file)
index 0000000..9b1fa4d
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+import java.util.Map;
+import org.apache.samza.config.StreamConfig$;
+
+public class StreamTestUtils {
+
+  /**
+   * Adds the stream.stream-id.* configurations for the provided {@code streamId} to the provided {@code configs} Map.
+   *
+   * @param configs the configs Map to add the stream configurations to
+   * @param streamId the id of the stream
+   * @param systemName the system for the stream
+   * @param physicalName the physical name for the stream
+   */
+  public static void addStreamConfigs(Map<String, String> configs,
+      String streamId, String systemName, String physicalName) {
+    configs.put(String.format(StreamConfig$.MODULE$.SYSTEM_FOR_STREAM_ID(), streamId), systemName);
+    configs.put(String.format(StreamConfig$.MODULE$.PHYSICAL_NAME_FOR_STREAM_ID(), streamId), physicalName);
+  }
+}
\ No newline at end of file
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.samza.runtime;
+package org.apache.samza.util;
 
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
-import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.system.StreamSpec;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
-public class TestAbstractApplicationRunner {
+
+public class TestStreamUtil {
   private static final String STREAM_ID = "t3st-Stream_Id";
   private static final String STREAM_ID_INVALID = "test#Str3amId!";
 
@@ -40,26 +40,19 @@ public class TestAbstractApplicationRunner {
   private static final String TEST_PHYSICAL_NAME_SPECIAL_CHARS = "test://Physical.Name?";
 
   private static final String TEST_SYSTEM = "t3st-System_Name";
-  private static final String TEST_SYSTEM2 = "testSystemName2";
   private static final String TEST_SYSTEM_INVALID = "test:System!Name@";
 
   private static final String TEST_DEFAULT_SYSTEM = "testDefaultSystemName";
 
 
-  @Test(expected = NullPointerException.class)
-  public void testConfigValidation() {
-    new TestAbstractApplicationRunnerImpl(null);
-  }
-
   // The physical name should be pulled from the StreamConfig.PHYSICAL_NAME property value.
   @Test
-  public void testgetStreamWithPhysicalNameInConfig() {
+  public void testGetStreamWithPhysicalNameInConfig() {
     Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
 
-    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config));
 
     assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
   }
@@ -67,80 +60,74 @@ public class TestAbstractApplicationRunner {
   // The streamId should be used as the physicalName when the physical name is not specified.
   // NOTE: its either this, set to null, or exception. This seems better for backward compatibility and API brevity.
   @Test
-  public void testgetStreamWithoutPhysicalNameInConfig() {
+  public void testGetStreamWithoutPhysicalNameInConfig() {
     Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
 
-    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config));
 
     assertEquals(STREAM_ID, spec.getPhysicalName());
   }
 
   // If the system is specified at the stream scope, use it
   @Test
-  public void testgetStreamWithSystemAtStreamScopeInConfig() {
+  public void testGetStreamWithSystemAtStreamScopeInConfig() {
     Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
 
-    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config));
 
     assertEquals(TEST_SYSTEM, spec.getSystemName());
   }
 
   // If system isn't specified at stream scope, use the default system
   @Test
-  public void testgetStreamWithSystemAtDefaultScopeInConfig() {
+  public void testGetStreamWithSystemAtDefaultScopeInConfig() {
     Config config = addConfigs(buildStreamConfig(STREAM_ID,
-                                                  StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME),
-                                JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME),
+        JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
 
-    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config));
 
     assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName());
   }
 
   // Stream scope should override default scope
   @Test
-  public void testgetStreamWithSystemAtBothScopesInConfig() {
+  public void testGetStreamWithSystemAtBothScopesInConfig() {
     Config config = addConfigs(buildStreamConfig(STREAM_ID,
-                                                StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
-                                                StreamConfig.SYSTEM(), TEST_SYSTEM),
-                                JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), TEST_SYSTEM),
+        JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
 
-    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config));
 
     assertEquals(TEST_SYSTEM, spec.getSystemName());
   }
 
   // System is required. Throw if it cannot be determined.
   @Test(expected = IllegalArgumentException.class)
-  public void testgetStreamWithOutSystemInConfig() {
+  public void testGetStreamWithOutSystemInConfig() {
     Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME);
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME);
 
-    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config));
 
     assertEquals(TEST_SYSTEM, spec.getSystemName());
   }
 
   // The properties in the config "streams.{streamId}.*" should be passed through to the spec.
   @Test
-  public void testgetStreamPropertiesPassthrough() {
+  public void testGetStreamPropertiesPassthrough() {
     Config config = buildStreamConfig(STREAM_ID,
-                                    StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
-                                    StreamConfig.SYSTEM(), TEST_SYSTEM,
-                                    "systemProperty1", "systemValue1",
-                                    "systemProperty2", "systemValue2",
-                                    "systemProperty3", "systemValue3");
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), TEST_SYSTEM,
+        "systemProperty1", "systemValue1",
+        "systemProperty2", "systemValue2",
+        "systemProperty3", "systemValue3");
 
-    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config));
 
     Map<String, String> properties = spec.getConfig();
     assertEquals(3, properties.size());
@@ -156,14 +143,13 @@ public class TestAbstractApplicationRunner {
   @Test
   public void testGetStreamSamzaPropertiesOmitted() {
     Config config = buildStreamConfig(STREAM_ID,
-                              StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
-                                    StreamConfig.SYSTEM(), TEST_SYSTEM,
-                                    "systemProperty1", "systemValue1",
-                                    "systemProperty2", "systemValue2",
-                                    "systemProperty3", "systemValue3");
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), TEST_SYSTEM,
+        "systemProperty1", "systemValue1",
+        "systemProperty2", "systemValue2",
+        "systemProperty3", "systemValue3");
 
-    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config));
 
     Map<String, String> properties = spec.getConfig();
     assertEquals(3, properties.size());
@@ -185,8 +171,7 @@ public class TestAbstractApplicationRunner {
         sysStreamPrefix + "systemProperty4", "systemValue4",
         sysStreamPrefix + "systemProperty2", "systemValue8");
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = env.getStreamSpec(STREAM_ID);
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config));
 
     Map<String, String> properties = spec.getConfig();
     assertEquals(4, properties.size());
@@ -203,10 +188,9 @@ public class TestAbstractApplicationRunner {
         "segment.bytes", "5309"),
         String.format("systems.%s.default.stream.replication.factor", TEST_SYSTEM), "4", // System default property
         String.format("systems.%s.default.stream.segment.bytest", TEST_SYSTEM), "867"
-        );
+    );
 
-    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = env.getStreamSpec(STREAM_ID);
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config));
 
     Map<String, String> properties = spec.getConfig();
     assertEquals(3, properties.size());
@@ -218,11 +202,10 @@ public class TestAbstractApplicationRunner {
   @Test
   public void testGetStreamPhysicalNameArgSimple() {
     Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
 
-    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config));
 
     assertEquals(STREAM_ID, spec.getId());
     assertEquals(TEST_PHYSICAL_NAME2, spec.getPhysicalName());
@@ -233,11 +216,10 @@ public class TestAbstractApplicationRunner {
   @Test
   public void testGetStreamPhysicalNameArgSpecialCharacters() {
     Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME_SPECIAL_CHARS,
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME_SPECIAL_CHARS,
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
 
-    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config));
     assertEquals(TEST_PHYSICAL_NAME_SPECIAL_CHARS, spec.getPhysicalName());
   }
 
@@ -245,11 +227,10 @@ public class TestAbstractApplicationRunner {
   @Test
   public void testGetStreamPhysicalNameArgNull() {
     Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), null,
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+        StreamConfig.PHYSICAL_NAME(), null,
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
 
-    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config));
     assertNull(spec.getPhysicalName());
   }
 
@@ -257,11 +238,10 @@ public class TestAbstractApplicationRunner {
   @Test
   public void testGetStreamSystemNameArgValid() {
     Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, // This should be ignored because of the explicit arg
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);              // This too
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, // This should be ignored because of the explicit arg
+        StreamConfig.SYSTEM(), TEST_SYSTEM);              // This too
 
-    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config));
 
     assertEquals(STREAM_ID, spec.getId());
     assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
@@ -272,11 +252,10 @@ public class TestAbstractApplicationRunner {
   @Test(expected = IllegalArgumentException.class)
   public void testGetStreamSystemNameArgInvalid() {
     Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM_INVALID);
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), TEST_SYSTEM_INVALID);
 
-    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    runner.getStreamSpec(STREAM_ID);
+    StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config));
   }
 
   // Empty strings are NOT allowed for system name, because it's used as an identifier in the config.
@@ -286,19 +265,17 @@ public class TestAbstractApplicationRunner {
         StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
         StreamConfig.SYSTEM(), "");
 
-    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    runner.getStreamSpec(STREAM_ID);
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config));
   }
 
   // Null is not allowed IllegalArgumentException system name.
   @Test(expected = IllegalArgumentException.class)
   public void testGetStreamSystemNameArgNull() {
     Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
-                                      StreamConfig.SYSTEM(), null);
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), null);
 
-    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    runner.getStreamSpec(STREAM_ID);
+    StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config));
   }
 
   // Special characters are NOT allowed for streamId, because it's used as an identifier in the config.
@@ -307,8 +284,7 @@ public class TestAbstractApplicationRunner {
     Config config = buildStreamConfig(STREAM_ID_INVALID,
         StreamConfig.SYSTEM(), TEST_SYSTEM);
 
-    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    runner.getStreamSpec(STREAM_ID_INVALID);
+    StreamUtil.getStreamSpec(STREAM_ID_INVALID, new StreamConfig(config));
   }
 
   // Empty strings are NOT allowed for streamId, because it's used as an identifier in the config.
@@ -317,8 +293,7 @@ public class TestAbstractApplicationRunner {
     Config config = buildStreamConfig("",
         StreamConfig.SYSTEM(), TEST_SYSTEM);
 
-    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    runner.getStreamSpec("");
+    StreamUtil.getStreamSpec("", new StreamConfig(config));
   }
 
   // Null is not allowed for streamId.
@@ -327,8 +302,7 @@ public class TestAbstractApplicationRunner {
     Config config = buildStreamConfig(null,
         StreamConfig.SYSTEM(), TEST_SYSTEM);
 
-    AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
-    runner.getStreamSpec(null);
+    StreamUtil.getStreamSpec(null, new StreamConfig(config));
   }
 
 
@@ -360,32 +334,4 @@ public class TestAbstractApplicationRunner {
     result.putAll(buildConfig(kvs));
     return new MapConfig(result);
   }
-
-  private class TestAbstractApplicationRunnerImpl extends AbstractApplicationRunner {
-
-    public TestAbstractApplicationRunnerImpl(Config config) {
-      super(config);
-    }
-
-    @Override
-    public void runTask() {
-      throw new UnsupportedOperationException("runTask is not supported in this test");
-    }
-
-    @Override
-    public void run(StreamApplication streamApp) {
-      // do nothing. We're only testing the stream creation methods at this point.
-    }
-
-    @Override
-    public void kill(StreamApplication streamApp) {
-      // do nothing. We're only testing the stream creation methods at this point.
-    }
-
-    @Override
-    public ApplicationStatus status(StreamApplication streamApp) {
-      // do nothing. We're only testing the stream creation methods at this point.
-      return null;
-    }
-  }
 }
index 217248d..113dced 100644 (file)
@@ -110,7 +110,6 @@ public class KafkaStreamSpec extends StreamSpec {
                                 originalSpec.getSystemName(),
                                 originalSpec.getPartitionCount(),
                                 replicationFactor,
-                                originalSpec.isBroadcast(),
                                 mapToProperties(filterUnsupportedProperties(originalSpec.getConfig())));
   }
 
@@ -125,7 +124,7 @@ public class KafkaStreamSpec extends StreamSpec {
    * @param partitionCount  The number of partitions.
    */
   public KafkaStreamSpec(String id, String topicName, String systemName, int partitionCount) {
-    this(id, topicName, systemName, partitionCount, DEFAULT_REPLICATION_FACTOR, false, new Properties());
+    this(id, topicName, systemName, partitionCount, DEFAULT_REPLICATION_FACTOR, new Properties());
   }
 
   /**
@@ -146,13 +145,11 @@ public class KafkaStreamSpec extends StreamSpec {
    *
    * @param replicationFactor The number of topic replicas in the Kafka cluster for durability.
    *
-   * @param isBroadcast       The stream is broadcast or not.
-   *
    * @param properties        A set of properties for the stream. These may be System-specfic.
    */
   public KafkaStreamSpec(String id, String topicName, String systemName, int partitionCount, int replicationFactor,
-      Boolean isBroadcast, Properties properties) {
-    super(id, topicName, systemName, partitionCount, false, isBroadcast, propertiesToMap(properties));
+      Properties properties) {
+    super(id, topicName, systemName, partitionCount, propertiesToMap(properties));
 
     if (partitionCount < 1) {
       throw new IllegalArgumentException("Parameter 'partitionCount' must be > 0");
@@ -168,12 +165,12 @@ public class KafkaStreamSpec extends StreamSpec {
   @Override
   public StreamSpec copyWithPartitionCount(int partitionCount) {
     return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), partitionCount, getReplicationFactor(),
-        isBroadcast(), getProperties());
+        getProperties());
   }
 
   public KafkaStreamSpec copyWithReplicationFactor(int replicationFactor) {
     return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), getPartitionCount(), replicationFactor,
-        isBroadcast(), getProperties());
+        getProperties());
   }
 
   /**
@@ -183,7 +180,7 @@ public class KafkaStreamSpec extends StreamSpec {
    */
   public KafkaStreamSpec copyWithProperties(Properties properties) {
     return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), getPartitionCount(), getReplicationFactor(),
-        isBroadcast(), properties);
+        properties);
   }
 
   public int getReplicationFactor() {
index 07f4710..26664ea 100644 (file)
@@ -32,7 +32,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.samza.SamzaException
 import org.apache.samza.config.ApplicationConfig.ApplicationMode
 import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.util.{Logging, Util}
+import org.apache.samza.util.{Logging, StreamUtil}
 
 import scala.collection.JavaConverters._
 
@@ -237,7 +237,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
       val storeName = if (matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + cn)
 
       storageConfig.getChangelogStream(storeName).foreach(changelogName => {
-        val systemStream = Util.getSystemStreamFromNames(changelogName)
+        val systemStream = StreamUtil.getSystemStreamFromNames(changelogName)
         val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem))
         storeToChangelog += storeName -> systemStream.getStream
       })
index 6dc2f82..ce4544b 100644 (file)
@@ -21,11 +21,11 @@ package org.apache.samza.config
 
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils.ZkUtils
-import org.apache.samza.config.KafkaConfig.{ Config2Kafka, REGEX_RESOLVED_STREAMS }
+import org.apache.samza.config.KafkaConfig.{Config2Kafka, REGEX_RESOLVED_STREAMS}
 import org.apache.samza.SamzaException
-import org.apache.samza.util.Util
+import org.apache.samza.util.{Logging, StreamUtil}
+
 import collection.JavaConverters._
-import org.apache.samza.util.Logging
 import scala.collection._
 import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.system.SystemStream
@@ -87,7 +87,7 @@ class RegExTopicGenerator extends ConfigRewriter with Logging {
     info("Generated config values for %d new topics" format newInputStreams.size)
 
     val inputStreams = TaskConfig.INPUT_STREAMS -> (existingInputStreams ++ newInputStreams)
-      .map(Util.getNameFromSystemStream)
+      .map(StreamUtil.getNameFromSystemStream)
       .toArray
       .sortWith(_ < _)
       .mkString(",")
index a63db03..6ab4d32 100644 (file)
@@ -486,10 +486,10 @@ class KafkaSystemAdmin(
       val topicName = spec.getPhysicalName
       val topicMeta = topicMetaInformation.getOrElse(topicName, throw new StreamValidationException("Unable to find topic information for topic " + topicName))
       new KafkaStreamSpec(spec.getId, topicName, systemName, spec.getPartitionCount, topicMeta.replicationFactor,
-        spec.isBroadcast, topicMeta.kafkaProps)
+        topicMeta.kafkaProps)
     } else if (spec.isCoordinatorStream){
       new KafkaStreamSpec(spec.getId, spec.getPhysicalName, systemName, 1, coordinatorStreamReplicationFactor,
-        spec.isBroadcast, coordinatorStreamProperties)
+        coordinatorStreamProperties)
     } else if (intermediateStreamProperties.contains(spec.getId)) {
       KafkaStreamSpec.fromSpec(spec).copyWithProperties(intermediateStreamProperties(spec.getId))
     } else {
index c00ed2d..14d2fe6 100644 (file)
@@ -21,20 +21,20 @@ package org.apache.samza.system.kafka;
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import java.util.Properties;
-import org.apache.samza.runtime.TestAbstractApplicationRunner;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.util.TestStreamUtil;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
 
 /**
- * See also the general StreamSpec tests in {@link TestAbstractApplicationRunner}
+ * See also the general StreamSpec tests in {@link TestStreamUtil}
  */
 public class TestKafkaStreamSpec {
 
   @Test
   public void testUnsupportedConfigStrippedFromProperties() {
-    StreamSpec original = new StreamSpec("dummyId","dummyPhysicalName", "dummySystemName", false, ImmutableMap.of("segment.bytes", "4", "replication.factor", "7"));
+    StreamSpec original = new StreamSpec("dummyId","dummyPhysicalName", "dummySystemName", ImmutableMap.of("segment.bytes", "4", "replication.factor", "7"));
 
     // First verify the original
     assertEquals("7", original.get("replication.factor"));
index 71718b0..8d92f4d 100644 (file)
@@ -113,7 +113,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     Mockito.doThrow(new RuntimeException()).when(mockKafkaProducer).flush(taskName.getTaskName)
 
     val props = new org.apache.samza.config.KafkaConfig(config).getCheckpointTopicProperties()
-    val spec = new KafkaStreamSpec("id", checkpointTopic, checkpointSystemName, 1, 1, false, props)
+    val spec = new KafkaStreamSpec("id", checkpointTopic, checkpointSystemName, 1, 1, props)
     val checkPointManager = new KafkaCheckpointManager(spec, new MockSystemFactory, false, config, new NoOpMetricsRegistry)
     checkPointManager.MaxRetryDurationMs = 1
 
@@ -193,7 +193,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
 
     val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory])
 
-    val spec = new KafkaStreamSpec("id", cpTopic, checkpointSystemName, 1, 1, false, props)
+    val spec = new KafkaStreamSpec("id", cpTopic, checkpointSystemName, 1, 1, props)
     new KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, config, new NoOpMetricsRegistry, serde)
   }
 
index 65b8c8c..ede7995 100644 (file)
@@ -26,11 +26,11 @@ import java.util.Map;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.StreamGraphSpec;
-import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
@@ -86,17 +86,25 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
     OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+
+    StreamConfig streamConfig = new StreamConfig(samzaConfig);
+    String inputStreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
+    String inputSystem = streamConfig.getSystem(inputStreamId);
+    String inputPhysicalName = streamConfig.getPhysicalName(inputStreamId);
+    String outputStreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get();
+    String outputSystem = streamConfig.getSystem(outputStreamId);
+    String outputPhysicalName = streamConfig.getPhysicalName(outputStreamId);
+    
     Assert.assertEquals(1, specGraph.getOutputStreams().size());
-    Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("outputTopic", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", outputSystem);
+    Assert.assertEquals("outputTopic", outputPhysicalName);
     Assert.assertEquals(1, specGraph.getInputOperators().size());
-    Assert.assertEquals("testavro",
-        specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("SIMPLE1",
-        specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+    
+    Assert.assertEquals("testavro", inputSystem);
+    Assert.assertEquals("SIMPLE1", inputPhysicalName);
 
     validatePerTaskContextInit(graphSpec, samzaConfig);
   }
@@ -130,17 +138,24 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
     OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+
+    StreamConfig streamConfig = new StreamConfig(samzaConfig);
+    String inputStreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
+    String inputSystem = streamConfig.getSystem(inputStreamId);
+    String inputPhysicalName = streamConfig.getPhysicalName(inputStreamId);
+    String outputStreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get();
+    String outputSystem = streamConfig.getSystem(outputStreamId);
+    String outputPhysicalName = streamConfig.getPhysicalName(outputStreamId);
+
     Assert.assertEquals(1, specGraph.getOutputStreams().size());
-    Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("outputTopic", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", outputSystem);
+    Assert.assertEquals("outputTopic", outputPhysicalName);
     Assert.assertEquals(1, specGraph.getInputOperators().size());
-    Assert.assertEquals("testavro",
-        specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("COMPLEX1",
-        specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", inputSystem);
+    Assert.assertEquals("COMPLEX1", inputPhysicalName);
 
     validatePerTaskContextInit(graphSpec, samzaConfig);
   }
@@ -155,17 +170,24 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
     OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+
+    StreamConfig streamConfig = new StreamConfig(samzaConfig);
+    String inputStreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
+    String inputSystem = streamConfig.getSystem(inputStreamId);
+    String inputPhysicalName = streamConfig.getPhysicalName(inputStreamId);
+    String outputStreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get();
+    String outputSystem = streamConfig.getSystem(outputStreamId);
+    String outputPhysicalName = streamConfig.getPhysicalName(outputStreamId);
+
     Assert.assertEquals(1, specGraph.getOutputStreams().size());
-    Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("outputTopic", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", outputSystem);
+    Assert.assertEquals("outputTopic", outputPhysicalName);
     Assert.assertEquals(1, specGraph.getInputOperators().size());
-    Assert.assertEquals("testavro",
-        specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("COMPLEX1",
-        specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", inputSystem);
+    Assert.assertEquals("COMPLEX1", inputPhysicalName);
 
     validatePerTaskContextInit(graphSpec, samzaConfig);
   }
@@ -184,7 +206,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -203,7 +225,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -222,7 +244,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -241,7 +263,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -258,7 +280,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -277,7 +299,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -297,7 +319,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -316,7 +338,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -335,7 +357,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -354,7 +376,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -373,7 +395,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -396,7 +418,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -415,7 +437,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -434,29 +456,40 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
     OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
 
+    StreamConfig streamConfig = new StreamConfig(samzaConfig);
+    String input1StreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
+    String input1System = streamConfig.getSystem(input1StreamId);
+    String input1PhysicalName = streamConfig.getPhysicalName(input1StreamId);
+    String input2StreamId = specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get();
+    String input2System = streamConfig.getSystem(input2StreamId);
+    String input2PhysicalName = streamConfig.getPhysicalName(input2StreamId);
+    String input3StreamId = specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get();
+    String input3System = streamConfig.getSystem(input3StreamId);
+    String input3PhysicalName = streamConfig.getPhysicalName(input3StreamId);
+    String output1StreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get();
+    String output1System = streamConfig.getSystem(output1StreamId);
+    String output1PhysicalName = streamConfig.getPhysicalName(output1StreamId);
+    String output2StreamId = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get();
+    String output2System = streamConfig.getSystem(output2StreamId);
+    String output2PhysicalName = streamConfig.getPhysicalName(output2StreamId);
+
     Assert.assertEquals(2, specGraph.getOutputStreams().size());
-    Assert.assertEquals("kafka", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("sql-job-1-partition_by-stream_1", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
-    Assert.assertEquals("enrichedPageViewTopic", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+    Assert.assertEquals("kafka", output1System);
+    Assert.assertEquals("sql-job-1-partition_by-stream_1", output1PhysicalName);
+    Assert.assertEquals("testavro", output2System);
+    Assert.assertEquals("enrichedPageViewTopic", output2PhysicalName);
 
     Assert.assertEquals(3, specGraph.getInputOperators().size());
-    Assert.assertEquals("testavro",
-        specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("PAGEVIEW",
-        specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals("testavro",
-        specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
-    Assert.assertEquals("PROFILE",
-        specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
-    Assert.assertEquals("kafka",
-        specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
-    Assert.assertEquals("sql-job-1-partition_by-stream_1",
-        specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", input1System);
+    Assert.assertEquals("PAGEVIEW", input1PhysicalName);
+    Assert.assertEquals("testavro", input2System);
+    Assert.assertEquals("PROFILE", input2PhysicalName);
+    Assert.assertEquals("kafka", input3System);
+    Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName);
 
     validatePerTaskContextInit(graphSpec, samzaConfig);
   }
@@ -476,32 +509,41 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
 
     OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
 
+    StreamConfig streamConfig = new StreamConfig(samzaConfig);
+    String input1StreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
+    String input1System = streamConfig.getSystem(input1StreamId);
+    String input1PhysicalName = streamConfig.getPhysicalName(input1StreamId);
+    String input2StreamId = specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get();
+    String input2System = streamConfig.getSystem(input2StreamId);
+    String input2PhysicalName = streamConfig.getPhysicalName(input2StreamId);
+    String input3StreamId = specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get();
+    String input3System = streamConfig.getSystem(input3StreamId);
+    String input3PhysicalName = streamConfig.getPhysicalName(input3StreamId);
+    String output1StreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get();
+    String output1System = streamConfig.getSystem(output1StreamId);
+    String output1PhysicalName = streamConfig.getPhysicalName(output1StreamId);
+    String output2StreamId = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get();
+    String output2System = streamConfig.getSystem(output2StreamId);
+    String output2PhysicalName = streamConfig.getPhysicalName(output2StreamId);
+
     Assert.assertEquals(2, specGraph.getOutputStreams().size());
-    Assert.assertEquals("kafka", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("sql-job-1-partition_by-stream_1",
-        specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
-    Assert.assertEquals("enrichedPageViewTopic",
-        specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+    Assert.assertEquals("kafka", output1System);
+    Assert.assertEquals("sql-job-1-partition_by-stream_1", output1PhysicalName);
+    Assert.assertEquals("testavro", output2System);
+    Assert.assertEquals("enrichedPageViewTopic", output2PhysicalName);
 
     Assert.assertEquals(3, specGraph.getInputOperators().size());
-    Assert.assertEquals("testavro",
-        specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("PAGEVIEW",
-        specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals("testavro",
-        specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
-    Assert.assertEquals("PROFILE",
-        specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
-    Assert.assertEquals("kafka",
-        specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
-    Assert.assertEquals("sql-job-1-partition_by-stream_1",
-        specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", input1System);
+    Assert.assertEquals("PAGEVIEW", input1PhysicalName);
+    Assert.assertEquals("testavro", input2System);
+    Assert.assertEquals("PROFILE", input2PhysicalName);
+    Assert.assertEquals("kafka", input3System);
+    Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName);
 
     validatePerTaskContextInit(graphSpec, samzaConfig);
   }
@@ -521,32 +563,41 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
 
     OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
 
+    StreamConfig streamConfig = new StreamConfig(samzaConfig);
+    String input1StreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
+    String input1System = streamConfig.getSystem(input1StreamId);
+    String input1PhysicalName = streamConfig.getPhysicalName(input1StreamId);
+    String input2StreamId = specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get();
+    String input2System = streamConfig.getSystem(input2StreamId);
+    String input2PhysicalName = streamConfig.getPhysicalName(input2StreamId);
+    String input3StreamId = specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get();
+    String input3System = streamConfig.getSystem(input3StreamId);
+    String input3PhysicalName = streamConfig.getPhysicalName(input3StreamId);
+    String output1StreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get();
+    String output1System = streamConfig.getSystem(output1StreamId);
+    String output1PhysicalName = streamConfig.getPhysicalName(output1StreamId);
+    String output2StreamId = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get();
+    String output2System = streamConfig.getSystem(output2StreamId);
+    String output2PhysicalName = streamConfig.getPhysicalName(output2StreamId);
+
     Assert.assertEquals(2, specGraph.getOutputStreams().size());
-    Assert.assertEquals("kafka", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("sql-job-1-partition_by-stream_1",
-        specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
-    Assert.assertEquals("enrichedPageViewTopic",
-        specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+    Assert.assertEquals("kafka", output1System);
+    Assert.assertEquals("sql-job-1-partition_by-stream_1", output1PhysicalName);
+    Assert.assertEquals("testavro", output2System);
+    Assert.assertEquals("enrichedPageViewTopic", output2PhysicalName);
 
     Assert.assertEquals(3, specGraph.getInputOperators().size());
-    Assert.assertEquals("testavro",
-        specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("PROFILE",
-        specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals("testavro",
-        specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
-    Assert.assertEquals("PAGEVIEW",
-        specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
-    Assert.assertEquals("kafka",
-        specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
-    Assert.assertEquals("sql-job-1-partition_by-stream_1",
-        specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", input1System);
+    Assert.assertEquals("PROFILE", input1PhysicalName);
+    Assert.assertEquals("testavro", input2System);
+    Assert.assertEquals("PAGEVIEW", input2PhysicalName);
+    Assert.assertEquals("kafka", input3System);
+    Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName);
 
     validatePerTaskContextInit(graphSpec, samzaConfig);
   }
@@ -566,7 +617,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
     OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
 
@@ -590,7 +641,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 }
index 617cea6..4309d92 100644 (file)
@@ -30,7 +30,7 @@ import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.task.TaskCoordinator.RequestScope;
-import org.apache.samza.util.Util;
+import org.apache.samza.util.StreamUtil;
 
 /**
  * A simple test job that reads strings, converts them to integers, multiplies
@@ -59,7 +59,7 @@ public class NegateNumberTask implements StreamTask, InitableTask {
     if (outputSystemStreamString == null) {
       throw new ConfigException("Missing required configuration: task.outputs");
     }
-    outputSystemStream = Util.getSystemStreamFromNames(outputSystemStreamString);
+    outputSystemStream = StreamUtil.getSystemStreamFromNames(outputSystemStreamString);
   }
 
   @Override
index d1f1d84..99d047d 100644 (file)
@@ -27,7 +27,7 @@ import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskCoordinator
 import org.apache.samza.task.TaskCoordinator.RequestScope
 import org.apache.samza.config.Config
-import org.apache.samza.util.{Util, Logging}
+import org.apache.samza.util.{Logging, StreamUtil}
 import org.apache.samza.system.SystemStream
 import org.apache.samza.system.OutgoingMessageEnvelope
 
@@ -85,7 +85,7 @@ class TestPerformanceTask extends StreamTask with InitableTask with Logging {
   def init(config: Config, context: TaskContext) {
     logInterval = config.getInt("task.log.interval", 10000)
     maxMessages = config.getInt("task.max.messages", 10000000)
-    outputSystemStream = Option(config.get("task.outputs", null)).map(Util.getSystemStreamFromNames(_))
+    outputSystemStream = Option(config.get("task.outputs", null)).map(StreamUtil.getSystemStreamFromNames)
   }
 
   def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
index 2171d07..ec9c05d 100644 (file)
@@ -34,7 +34,6 @@ import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.test.operator.data.AdClick;
@@ -54,7 +53,7 @@ public class RepartitionJoinWindowApp implements StreamApplication {
   public static final String INPUT_TOPIC_NAME_2_PROP = "inputTopicName2";
   public static final String OUTPUT_TOPIC_NAME_PROP = "outputTopicName";
 
-  private final List<StreamSpec> intermediateStreams = new ArrayList<>();
+  private final List<String> intermediateStreamIds = new ArrayList<>();
 
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
@@ -106,14 +105,14 @@ public class RepartitionJoinWindowApp implements StreamApplication {
           });
 
 
-    intermediateStreams.add(((IntermediateMessageStreamImpl) pageViewsRepartitionedByViewId).getStreamSpec());
-    intermediateStreams.add(((IntermediateMessageStreamImpl) adClicksRepartitionedByViewId).getStreamSpec());
-    intermediateStreams.add(((IntermediateMessageStreamImpl) userPageAdClicksByUserId).getStreamSpec());
+    intermediateStreamIds.add(((IntermediateMessageStreamImpl) pageViewsRepartitionedByViewId).getStreamId());
+    intermediateStreamIds.add(((IntermediateMessageStreamImpl) adClicksRepartitionedByViewId).getStreamId());
+    intermediateStreamIds.add(((IntermediateMessageStreamImpl) userPageAdClicksByUserId).getStreamId());
 
   }
 
-  public List<StreamSpec> getIntermediateStreams() {
-    return intermediateStreams;
+  List<String> getIntermediateStreamIds() {
+    return intermediateStreamIds;
   }
 
   private static class UserPageViewAdClicksJoiner implements JoinFunction<String, PageView, AdClick, UserPageAdClick> {
index a2adb70..a9a4026 100644 (file)
@@ -24,7 +24,6 @@ import java.util.HashSet;
 import java.util.Map;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.samza.Partition;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
 import org.apache.samza.system.kafka.KafkaSystemAdmin;
@@ -122,14 +121,14 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe
 
     // Verify that messages in the intermediate stream will be deleted in 10 seconds
     long startTimeMs = System.currentTimeMillis();
-    for (StreamSpec spec: app.getIntermediateStreams()) {
+    for (String streamId: app.getIntermediateStreamIds()) {
       long remainingMessageNum = -1;
 
       while (remainingMessageNum != 0 && System.currentTimeMillis() - startTimeMs < 10000) {
         remainingMessageNum = 0;
         SystemStreamMetadata metadatas = systemAdmin.getSystemStreamMetadata(
-            new HashSet<>(Arrays.asList(spec.getPhysicalName())), new ExponentialSleepStrategy.Mock(3)
-        ).get(spec.getPhysicalName()).get();
+            new HashSet<>(Arrays.asList(streamId)), new ExponentialSleepStrategy.Mock(3)
+        ).get(streamId).get();
 
         for (Map.Entry<Partition, SystemStreamPartitionMetadata> entry : metadatas.getSystemStreamPartitionMetadata().entrySet()) {
           SystemStreamPartitionMetadata metadata = entry.getValue();