Merge branch 'master' into 0.14.0
authorXinyu Liu <xiliu@xiliu-ld1.linkedin.biz>
Tue, 3 Oct 2017 22:09:41 +0000 (15:09 -0700)
committerXinyu Liu <xiliu@xiliu-ld1.linkedin.biz>
Tue, 3 Oct 2017 22:09:41 +0000 (15:09 -0700)
22 files changed:
1  2 
docs/learn/documentation/versioned/jobs/configuration-table.html
samza-core/src/main/java/org/apache/samza/execution/JobNode.java
samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java

@@@ -123,8 -132,11 +132,11 @@@ public class JobNode 
      configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);
  
      // write input/output streams to configs
 -    inEdges.stream().filter(StreamEdge::isIntermediate).forEach(edge -> addStreamConfig(edge, configs));
 +    inEdges.stream().filter(StreamEdge::isIntermediate).forEach(edge -> configs.putAll(edge.generateConfig()));
  
+     // write serialized serde instances and stream serde configs to configs
+     addSerdeConfigs(configs);
      log.info("Job {} has generated configs {}", jobName, configs);
  
      String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
   */
  package org.apache.samza.operators.impl;
  
- import java.util.Collection;
- import java.util.Collections;
- import java.util.HashSet;
- import java.util.Set;
+ import org.apache.samza.SamzaException;
  import org.apache.samza.config.Config;
  import org.apache.samza.config.MetricsConfig;
 +import org.apache.samza.container.TaskContextImpl;
 +import org.apache.samza.container.TaskName;
 +import org.apache.samza.operators.functions.WatermarkFunction;
 +import org.apache.samza.system.EndOfStreamMessage;
  import org.apache.samza.metrics.Counter;
  import org.apache.samza.metrics.MetricsRegistry;
  import org.apache.samza.metrics.Timer;
@@@ -39,14 -29,20 +36,23 @@@ import org.apache.samza.task.MessageCol
  import org.apache.samza.task.TaskContext;
  import org.apache.samza.task.TaskCoordinator;
  import org.apache.samza.util.HighResolutionClock;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
  
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.Set;
  /**
   * Abstract base class for all stream operator implementations.
+  *
+  * @param <M> type of the input to this operator
+  * @param <RM> type of the results of applying this operator
   */
  public abstract class OperatorImpl<M, RM> {
 +  private static final Logger LOG = LoggerFactory.getLogger(OperatorImpl.class);
    private static final String METRICS_GROUP = OperatorImpl.class.getName();
  
    private boolean initialized;
      long endNs = this.highResClock.nanoTime();
      this.handleMessageNs.update(endNs - startNs);
  
-     results.forEach(rm -> this.registeredOperators.forEach(op -> op.onMessage(rm, collector, coordinator)));
+     results.forEach(rm ->
+         this.registeredOperators.forEach(op ->
 -            op.onMessage(rm, collector, coordinator)));
++            op.onMessage(rm, collector, coordinator)));    
 +
 +    WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
 +    if (watermarkFn != null) {
 +      // check whether there is new watermark emitted from the user function
 +      Long outputWm = watermarkFn.getOutputWatermark();
 +      propagateWatermark(outputWm, collector, coordinator);
 +    }
    }
  
    /**
   */
  package org.apache.samza.operators.impl;
  
 +import com.google.common.collect.HashMultimap;
  import com.google.common.collect.Lists;
- import org.apache.commons.lang3.tuple.Pair;
 +import com.google.common.collect.Multimap;
 +import java.util.stream.Collectors;
  import org.apache.samza.config.Config;
 +import org.apache.samza.container.TaskContextImpl;
 +import org.apache.samza.job.model.JobModel;
+ import org.apache.samza.operators.KV;
  import org.apache.samza.operators.StreamGraphImpl;
  import org.apache.samza.operators.functions.JoinFunction;
  import org.apache.samza.operators.functions.PartialJoinFunction;
@@@ -272,70 -249,4 +275,68 @@@ public class OperatorImplGraph 
        }
      };
    }
-     if (opSpec instanceof OutputOperatorSpec) {
-       OutputOperatorSpec outputOpSpec = (OutputOperatorSpec) opSpec;
-       if (outputOpSpec.getOpCode() == OperatorSpec.OpCode.PARTITION_BY) {
-         outputToInputStreams.put(outputOpSpec.getOutputStream().getStreamSpec().toSystemStream(), input);
-       }
 +
 +  private boolean hasIntermediateStreams(StreamGraphImpl streamGraph) {
 +    return !Collections.disjoint(streamGraph.getInputOperators().keySet(), streamGraph.getOutputStreams().keySet());
 +  }
 +
 +  /**
 +   * calculate the task count that produces to each intermediate streams
 +   * @param streamToConsumerTasks input streams to task mapping
 +   * @param intermediateToInputStreams intermediate stream to input streams mapping
 +   * @return mapping from intermediate stream to task count
 +   */
 +  static Map<SystemStream, Integer> getProducerTaskCountForIntermediateStreams(
 +      Multimap<SystemStream, String> streamToConsumerTasks,
 +      Multimap<SystemStream, SystemStream> intermediateToInputStreams) {
 +    Map<SystemStream, Integer> result = new HashMap<>();
 +    intermediateToInputStreams.asMap().entrySet().forEach(entry -> {
 +        result.put(entry.getKey(),
 +            entry.getValue().stream()
 +                .flatMap(systemStream -> streamToConsumerTasks.get(systemStream).stream())
 +                .collect(Collectors.toSet()).size());
 +      });
 +    return result;
 +  }
 +
 +  /**
 +   * calculate the mapping from input streams to consumer tasks
 +   * @param jobModel JobModel object
 +   * @return mapping from input stream to tasks
 +   */
 +  static Multimap<SystemStream, String> getStreamToConsumerTasks(JobModel jobModel) {
 +    Multimap<SystemStream, String> streamToConsumerTasks = HashMultimap.create();
 +    jobModel.getContainers().values().forEach(containerModel -> {
 +        containerModel.getTasks().values().forEach(taskModel -> {
 +            taskModel.getSystemStreamPartitions().forEach(ssp -> {
 +                streamToConsumerTasks.put(ssp.getSystemStream(), taskModel.getTaskName().getTaskName());
 +              });
 +          });
 +      });
 +    return streamToConsumerTasks;
 +  }
 +
 +  /**
 +   * calculate the mapping from output streams to input streams
 +   * @param streamGraph the user {@link StreamGraphImpl} instance
 +   * @return mapping from output streams to input streams
 +   */
 +  static Multimap<SystemStream, SystemStream> getIntermediateToInputStreamsMap(StreamGraphImpl streamGraph) {
 +    Multimap<SystemStream, SystemStream> outputToInputStreams = HashMultimap.create();
 +    streamGraph.getInputOperators().entrySet().stream()
 +        .forEach(
 +            entry -> computeOutputToInput(entry.getKey().toSystemStream(), entry.getValue(), outputToInputStreams));
 +    return outputToInputStreams;
 +  }
 +
 +  private static void computeOutputToInput(SystemStream input, OperatorSpec opSpec,
 +      Multimap<SystemStream, SystemStream> outputToInputStreams) {
++    if (opSpec instanceof PartitionByOperatorSpec) {
++      PartitionByOperatorSpec spec = (PartitionByOperatorSpec) opSpec;
++      outputToInputStreams.put(spec.getOutputStream().getStreamSpec().toSystemStream(), input);
 +    } else {
 +      Collection<OperatorSpec> nextOperators = opSpec.getRegisteredOperatorSpecs();
 +      nextOperators.forEach(spec -> computeOutputToInput(input, spec, outputToInputStreams));
 +    }
 +  }
  }
index 0000000,072b31d..28b8dba
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,82 +1,108 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.samza.operators.impl;
+ import org.apache.samza.SamzaException;
+ import org.apache.samza.config.Config;
++import org.apache.samza.container.TaskContextImpl;
+ import org.apache.samza.operators.KV;
+ import org.apache.samza.operators.spec.OperatorSpec;
+ import org.apache.samza.operators.spec.OutputStreamImpl;
+ import org.apache.samza.operators.spec.PartitionByOperatorSpec;
++import org.apache.samza.system.ControlMessage;
++import org.apache.samza.system.EndOfStreamMessage;
+ import org.apache.samza.system.OutgoingMessageEnvelope;
++import org.apache.samza.system.StreamMetadataCache;
+ import org.apache.samza.system.SystemStream;
++import org.apache.samza.system.WatermarkMessage;
+ import org.apache.samza.task.MessageCollector;
+ import org.apache.samza.task.TaskContext;
+ import org.apache.samza.task.TaskCoordinator;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.function.Function;
+ /**
+  * An operator that sends sends messages to an output {@link SystemStream} for repartitioning them.
+  */
+ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> {
+   private final PartitionByOperatorSpec<M, K, V> partitionByOpSpec;
+   private final SystemStream systemStream;
+   private final Function<? super M, ? extends K> keyFunction;
+   private final Function<? super M, ? extends V> valueFunction;
++  private final String taskName;
++  private final ControlMessageSender controlMessageSender;
+   PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOpSpec, Config config, TaskContext context) {
+     this.partitionByOpSpec = partitionByOpSpec;
+     OutputStreamImpl<KV<K, V>> outputStream = partitionByOpSpec.getOutputStream();
+     if (!outputStream.isKeyedOutput()) {
+       throw new SamzaException("Output stream for repartitioning must be a keyed stream.");
+     }
+     this.systemStream = new SystemStream(
+         outputStream.getStreamSpec().getSystemName(),
+         outputStream.getStreamSpec().getPhysicalName());
+     this.keyFunction = partitionByOpSpec.getKeyFunction();
+     this.valueFunction = partitionByOpSpec.getValueFunction();
++    this.taskName = context.getTaskName().getTaskName();
++    StreamMetadataCache streamMetadataCache = ((TaskContextImpl) context).getStreamMetadataCache();
++    this.controlMessageSender = new ControlMessageSender(streamMetadataCache);
+   }
+   @Override
+   protected void handleInit(Config config, TaskContext context) {
+   }
+   @Override
+   public Collection<Void> handleMessage(M message, MessageCollector collector,
+       TaskCoordinator coordinator) {
+     K key = keyFunction.apply(message);
+     V value = valueFunction.apply(message);
+     collector.send(new OutgoingMessageEnvelope(systemStream, null, key, value));
+     return Collections.emptyList();
+   }
+   @Override
+   protected void handleClose() {
+   }
+   @Override
+   protected OperatorSpec<M, Void> getOperatorSpec() {
+     return partitionByOpSpec;
+   }
++
++  @Override
++  protected void handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
++    sendControlMessage(new EndOfStreamMessage(taskName), collector);
++  }
++
++  @Override
++  protected Long handleWatermark(long watermark, MessageCollector collector, TaskCoordinator coordinator) {
++    sendControlMessage(new WatermarkMessage(watermark, taskName), collector);
++    return watermark;
++  }
++
++  private void sendControlMessage(ControlMessage message, MessageCollector collector) {
++    SystemStream outputStream = partitionByOpSpec.getOutputStream().getStreamSpec().toSystemStream();
++    controlMessageSender.send(message, outputStream, collector);
++  }
+ }
   */
  package org.apache.samza.operators.spec;
  
- import org.apache.commons.lang3.tuple.Pair;
+ import org.apache.samza.operators.KV;
+ import org.apache.samza.serializers.Serde;
 +import org.apache.samza.operators.functions.WatermarkFunction;
  import org.apache.samza.system.StreamSpec;
  
- import java.util.function.BiFunction;
  /**
   * The spec for an operator that receives incoming messages from an input stream
   * and converts them to the input message.
@@@ -47,12 -49,15 +50,20 @@@ public class InputOperatorSpec<K, V> ex
      return this.streamSpec;
    }
  
-   public BiFunction<K, V, M> getMsgBuilder() {
-     return this.msgBuilder;
+   public Serde<K> getKeySerde() {
+     return keySerde;
+   }
+   public Serde<V> getValueSerde() {
+     return valueSerde;
+   }
+   public boolean isKeyedInput() {
+     return isKeyedInput;
    }
-   }
 +
 +  @Override
 +  public WatermarkFunction getWatermarkFn() {
 +    return null;
++  }  
  }
@@@ -19,7 -19,8 +19,9 @@@
  package org.apache.samza.operators.spec;
  
  import org.apache.samza.annotation.InterfaceStability;
 +import org.apache.samza.operators.functions.WatermarkFunction;
+ import org.apache.samza.operators.MessageStream;
+ import org.apache.samza.operators.MessageStreamImpl;
  
  import java.util.Collection;
  import java.util.LinkedHashSet;
@@@ -51,12 -46,7 +48,12 @@@ public class OutputOperatorSpec<M> exte
     * The {@link OutputStreamImpl} that this operator is sending its output to.
     * @return the {@link OutputStreamImpl} for this operator if any, else null.
     */
-   public OutputStreamImpl<?, ?, M> getOutputStream() {
+   public OutputStreamImpl<M> getOutputStream() {
      return this.outputStream;
    }
 +
 +  @Override
 +  public WatermarkFunction getWatermarkFn() {
 +    return null;
 +  }
  }
index 0000000,a2bb5f2..42eeb4b
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,76 +1,81 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.samza.operators.spec;
+ import org.apache.samza.operators.KV;
++import org.apache.samza.operators.functions.WatermarkFunction;
+ import java.util.function.Function;
+ /**
+  * The spec for an operator that re-partitions a {@link org.apache.samza.operators.MessageStream} to a
+  * {@link org.apache.samza.system.SystemStream}. This is usually paired with a corresponding
+  * {@link InputOperatorSpec} that consumes the {@link org.apache.samza.system.SystemStream} again.
+  * <p>
+  * This is a terminal operator and does not allow further operator chaining.
+  *
+  * @param <M> the type of message
+  * @param <K> the type of key in the message
+  * @param <V> the type of value in the message
+  */
+ public class PartitionByOperatorSpec<M, K, V> extends OperatorSpec<M, Void> {
+   private final OutputStreamImpl<KV<K, V>> outputStream;
+   private final Function<? super M, ? extends K> keyFunction;
+   private final Function<? super M, ? extends V> valueFunction;
+   /**
+    * Constructs an {@link PartitionByOperatorSpec} to send messages to the provided {@code outputStream}
+    *
+    * @param outputStream the {@link OutputStreamImpl} to send messages to
+    * @param keyFunction the {@link Function} for extracting the key from the message
+    * @param valueFunction the {@link Function} for extracting the value from the message
+    * @param opId the unique ID of this {@link SinkOperatorSpec} in the graph
+    */
+   PartitionByOperatorSpec(OutputStreamImpl<KV<K, V>> outputStream,
+       Function<? super M, ? extends K> keyFunction,
+       Function<? super M, ? extends V> valueFunction, int opId) {
+     super(OpCode.PARTITION_BY, opId);
+     this.outputStream = outputStream;
+     this.keyFunction = keyFunction;
+     this.valueFunction = valueFunction;
+   }
+   /**
+    * The {@link OutputStreamImpl} that this operator is sending its output to.
+    * @return the {@link OutputStreamImpl} for this operator if any, else null.
+    */
+   public OutputStreamImpl<KV<K, V>> getOutputStream() {
+     return this.outputStream;
+   }
+   public Function<? super M, ? extends K> getKeyFunction() {
+     return keyFunction;
+   }
+   public Function<? super M, ? extends V> getValueFunction() {
+     return valueFunction;
+   }
++  @Override
++  public WatermarkFunction getWatermarkFn() {
++    return null;
++  }
+ }
  package org.apache.samza.serializers;
  
  import java.util.Arrays;
  import org.apache.samza.SamzaException;
 -import org.apache.samza.message.EndOfStreamMessage;
 -import org.apache.samza.message.MessageType;
 -import org.apache.samza.message.WatermarkMessage;
 +import org.apache.samza.system.EndOfStreamMessage;
 +import org.apache.samza.system.MessageType;
 +import org.apache.samza.system.WatermarkMessage;
- import org.codehaus.jackson.type.TypeReference;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
  
  
  /**
   */
  package org.apache.samza.task;
  
- import org.apache.commons.lang3.tuple.Pair;
  import org.apache.samza.application.StreamApplication;
  import org.apache.samza.config.Config;
 +import org.apache.samza.system.EndOfStreamMessage;
 +import org.apache.samza.system.MessageType;
  import org.apache.samza.operators.ContextManager;
+ import org.apache.samza.operators.KV;
  import org.apache.samza.operators.StreamGraphImpl;
  import org.apache.samza.operators.impl.InputOperatorImpl;
  import org.apache.samza.operators.impl.OperatorImplGraph;
@@@ -111,21 -105,7 +111,21 @@@ public final class StreamOperatorTask i
      SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
      InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream);
      if (inputOpImpl != null) {
 -      inputOpImpl.onMessage(KV.of(ime.getKey(), ime.getMessage()), collector, coordinator);
 +      switch (MessageType.of(ime.getMessage())) {
 +        case USER_MESSAGE:
-           inputOpImpl.onMessage(Pair.of(ime.getKey(), ime.getMessage()), collector, coordinator);
++          inputOpImpl.onMessage(KV.of(ime.getKey(), ime.getMessage()), collector, coordinator);
 +          break;
 +
 +        case END_OF_STREAM:
 +          EndOfStreamMessage eosMessage = (EndOfStreamMessage) ime.getMessage();
 +          inputOpImpl.aggregateEndOfStream(eosMessage, ime.getSystemStreamPartition(), collector, coordinator);
 +          break;
 +
 +        case WATERMARK:
 +          WatermarkMessage watermarkMessage = (WatermarkMessage) ime.getMessage();
 +          inputOpImpl.aggregateWatermark(watermarkMessage, ime.getSystemStreamPartition(), collector, coordinator);
 +          break;
 +      }
      }
    }
  
index 0000000,c59c0cc..918da26
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,111 +1,112 @@@
 -    StreamEdge inputEdge = new StreamEdge(inputSpec);
 -    StreamEdge outputEdge = new StreamEdge(outputSpec);
 -    StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true);
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.samza.execution;
+ import org.apache.samza.config.Config;
+ import org.apache.samza.config.MapConfig;
+ import org.apache.samza.config.SerializerConfig;
+ import org.apache.samza.operators.KV;
+ import org.apache.samza.operators.MessageStream;
+ import org.apache.samza.operators.OutputStream;
+ import org.apache.samza.operators.StreamGraphImpl;
+ import org.apache.samza.runtime.ApplicationRunner;
+ import org.apache.samza.serializers.JsonSerdeV2;
+ import org.apache.samza.serializers.KVSerde;
+ import org.apache.samza.serializers.Serde;
+ import org.apache.samza.serializers.SerializableSerde;
+ import org.apache.samza.serializers.StringSerde;
+ import org.apache.samza.system.StreamSpec;
+ import org.junit.Test;
+ import java.util.Base64;
+ import java.util.HashMap;
+ import java.util.Map;
+ import java.util.stream.Collectors;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+ import static org.mockito.Mockito.doReturn;
+ import static org.mockito.Mockito.mock;
+ public class TestJobNode {
+   @Test
+   public void testAddSerdeConfigs() {
+     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+     StreamSpec inputSpec = new StreamSpec("input", "input", "input-system");
+     StreamSpec outputSpec = new StreamSpec("output", "output", "output-system");
+     StreamSpec partitionBySpec = new StreamSpec("null-null-partition_by-1", "partition_by-1", "intermediate-system");
+     doReturn(inputSpec).when(mockRunner).getStreamSpec("input");
+     doReturn(outputSpec).when(mockRunner).getStreamSpec("output");
+     doReturn(partitionBySpec).when(mockRunner).getStreamSpec("null-null-partition_by-1");
+     StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+     streamGraph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
+     MessageStream<KV<String, Object>> input = streamGraph.getInputStream("input");
+     OutputStream<KV<String, Object>> output = streamGraph.getOutputStream("output");
+     input.partitionBy(KV::getKey, KV::getValue).sendTo(output);
+     JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, mock(Config.class));
++    Config config = new MapConfig();
++    StreamEdge inputEdge = new StreamEdge(inputSpec, config);
++    StreamEdge outputEdge = new StreamEdge(outputSpec, config);
++    StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true, config);
+     jobNode.addInEdge(inputEdge);
+     jobNode.addOutEdge(outputEdge);
+     jobNode.addInEdge(repartitionEdge);
+     jobNode.addOutEdge(repartitionEdge);
+     Map<String, String> configs = new HashMap<>();
+     jobNode.addSerdeConfigs(configs);
+     MapConfig mapConfig = new MapConfig(configs);
+     Config serializers = mapConfig.subset("serializers.registry.", true);
+     // make sure that the serializers deserialize correctly
+     SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
+     Map<String, Serde> deserializedSerdes = serializers.entrySet().stream().collect(Collectors.toMap(
+         e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), ""),
+         e -> serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes()))
+     ));
+     assertEquals(2, serializers.size());
+     String inputKeySerde = mapConfig.get("streams.input.samza.key.serde");
+     String inputMsgSerde = mapConfig.get("streams.input.samza.msg.serde");
+     assertTrue(deserializedSerdes.containsKey(inputKeySerde));
+     assertTrue(inputKeySerde.startsWith(StringSerde.class.getSimpleName()));
+     assertTrue(deserializedSerdes.containsKey(inputMsgSerde));
+     assertTrue(inputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+     String outputKeySerde = mapConfig.get("streams.output.samza.key.serde");
+     String outputMsgSerde = mapConfig.get("streams.output.samza.msg.serde");
+     assertTrue(deserializedSerdes.containsKey(outputKeySerde));
+     assertTrue(outputKeySerde.startsWith(StringSerde.class.getSimpleName()));
+     assertTrue(deserializedSerdes.containsKey(outputMsgSerde));
+     assertTrue(outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+     String partitionByKeySerde = mapConfig.get("streams.null-null-partition_by-1.samza.key.serde");
+     String partitionByMsgSerde = mapConfig.get("streams.null-null-partition_by-1.samza.msg.serde");
+     assertTrue(deserializedSerdes.containsKey(partitionByKeySerde));
+     assertTrue(partitionByKeySerde.startsWith(StringSerde.class.getSimpleName()));
+     assertTrue(deserializedSerdes.containsKey(partitionByMsgSerde));
+     assertTrue(partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+   }
+ }
  
  package org.apache.samza.operators.impl;
  
- import org.apache.commons.lang3.tuple.Pair;
 +import com.google.common.collect.HashMultimap;
 +import com.google.common.collect.Multimap;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Set;
 +import org.apache.samza.Partition;
  import org.apache.samza.config.Config;
 +import org.apache.samza.config.JobConfig;
 +import org.apache.samza.config.MapConfig;
 +import org.apache.samza.container.TaskContextImpl;
 +import org.apache.samza.container.TaskName;
 +import org.apache.samza.job.model.ContainerModel;
 +import org.apache.samza.job.model.JobModel;
 +import org.apache.samza.job.model.TaskModel;
  import org.apache.samza.metrics.MetricsRegistryMap;
+ import org.apache.samza.operators.KV;
  import org.apache.samza.operators.MessageStream;
  import org.apache.samza.operators.OutputStream;
  import org.apache.samza.operators.StreamGraphImpl;
@@@ -45,9 -30,12 +45,14 @@@ import org.apache.samza.operators.funct
  import org.apache.samza.operators.functions.MapFunction;
  import org.apache.samza.operators.spec.OperatorSpec.OpCode;
  import org.apache.samza.runtime.ApplicationRunner;
+ import org.apache.samza.serializers.IntegerSerde;
+ import org.apache.samza.serializers.KVSerde;
+ import org.apache.samza.serializers.NoOpSerde;
++import org.apache.samza.serializers.Serde;
+ import org.apache.samza.serializers.StringSerde;
  import org.apache.samza.system.StreamSpec;
  import org.apache.samza.system.SystemStream;
 +import org.apache.samza.system.SystemStreamPartition;
  import org.apache.samza.task.MessageCollector;
  import org.apache.samza.task.TaskContext;
  import org.apache.samza.task.TaskCoordinator;
@@@ -121,6 -105,43 +123,47 @@@ public class TestOperatorImplGraph 
    }
  
    @Test
 -    TaskContext mockTaskContext = mock(TaskContext.class);
+   public void testPartitionByChain() {
+     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+     when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
+     when(mockRunner.getStreamSpec(eq("output"))).thenReturn(new StreamSpec("output", "output-stream", "output-system"));
+     when(mockRunner.getStreamSpec(eq("null-null-partition_by-1")))
+         .thenReturn(new StreamSpec("intermediate", "intermediate-stream", "intermediate-system"));
+     StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+     MessageStream<Object> inputStream = streamGraph.getInputStream("input");
+     OutputStream<KV<Integer, String>> outputStream = streamGraph
+         .getOutputStream("output", KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)));
+     inputStream
+         .partitionBy(Object::hashCode, Object::toString, KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)))
+         .sendTo(outputStream);
++    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
+     when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
++    when(mockTaskContext.getTaskName()).thenReturn(new TaskName("task 0"));
++    JobModel jobModel = mock(JobModel.class);
++    when(jobModel.getContainers()).thenReturn(Collections.EMPTY_MAP);
++    when(mockTaskContext.getJobModel()).thenReturn(jobModel);
+     OperatorImplGraph opImplGraph =
+         new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+     InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
+     assertEquals(1, inputOpImpl.registeredOperators.size());
+     OperatorImpl partitionByOpImpl = (PartitionByOperatorImpl) inputOpImpl.registeredOperators.iterator().next();
+     assertEquals(0, partitionByOpImpl.registeredOperators.size()); // is terminal but paired with an input operator
+     assertEquals(OpCode.PARTITION_BY, partitionByOpImpl.getOperatorSpec().getOpCode());
+     InputOperatorImpl repartitionedInputOpImpl =
+         opImplGraph.getInputOperator(new SystemStream("intermediate-system", "intermediate-stream"));
+     assertEquals(1, repartitionedInputOpImpl.registeredOperators.size());
+     OperatorImpl sendToOpImpl = (OutputOperatorImpl) repartitionedInputOpImpl.registeredOperators.iterator().next();
+     assertEquals(0, sendToOpImpl.registeredOperators.size());
+     assertEquals(OpCode.SEND_TO, sendToOpImpl.getOperatorSpec().getOpCode());
+   }
+   @Test
    public void testBroadcastChain() {
      ApplicationRunner mockRunner = mock(ApplicationRunner.class);
      when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
      StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
  
      JoinFunction mockJoinFunction = mock(JoinFunction.class);
-     MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v);
-     MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v);
+     MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", new NoOpSerde<>());
+     MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", new NoOpSerde<>());
      inputStream1.join(inputStream2, mockJoinFunction, Duration.ofHours(1));
  
 -    TaskContext mockTaskContext = mock(TaskContext.class);
 +    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
      when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
      OperatorImplGraph opImplGraph =
          new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
        }
      };
    }
-     BiFunction msgBuilder = mock(BiFunction.class);
-     MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m);
-     MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).filter(m -> true);
-     MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
-     Function mockFn = mock(Function.class);
-     OutputStream<Object, Object, Object> om1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
-     OutputStream<Object, Object, Object> om2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
-     m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).partitionBy(m -> "haha").sendTo(om1);
 +
 +  @Test
 +  public void testGetStreamToConsumerTasks() {
 +    String system = "test-system";
 +    String stream0 = "test-stream-0";
 +    String stream1 = "test-stream-1";
 +
 +    SystemStreamPartition ssp0 = new SystemStreamPartition(system, stream0, new Partition(0));
 +    SystemStreamPartition ssp1 = new SystemStreamPartition(system, stream0, new Partition(1));
 +    SystemStreamPartition ssp2 = new SystemStreamPartition(system, stream1, new Partition(0));
 +
 +    TaskName task0 = new TaskName("Task 0");
 +    TaskName task1 = new TaskName("Task 1");
 +    Set<SystemStreamPartition> ssps = new HashSet<>();
 +    ssps.add(ssp0);
 +    ssps.add(ssp2);
 +    TaskModel tm0 = new TaskModel(task0, ssps, new Partition(0));
 +    ContainerModel cm0 = new ContainerModel("c0", 0, Collections.singletonMap(task0, tm0));
 +    TaskModel tm1 = new TaskModel(task1, Collections.singleton(ssp1), new Partition(1));
 +    ContainerModel cm1 = new ContainerModel("c1", 1, Collections.singletonMap(task1, tm1));
 +
 +    Map<String, ContainerModel> cms = new HashMap<>();
 +    cms.put(cm0.getProcessorId(), cm0);
 +    cms.put(cm1.getProcessorId(), cm1);
 +
 +    JobModel jobModel = new JobModel(new MapConfig(), cms, null);
 +    Multimap<SystemStream, String> streamToTasks = OperatorImplGraph.getStreamToConsumerTasks(jobModel);
 +    assertEquals(streamToTasks.get(ssp0.getSystemStream()).size(), 2);
 +    assertEquals(streamToTasks.get(ssp2.getSystemStream()).size(), 1);
 +  }
 +
 +  @Test
 +  public void testGetOutputToInputStreams() {
 +    Map<String, String> configMap = new HashMap<>();
 +    configMap.put(JobConfig.JOB_NAME(), "test-app");
 +    configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system");
 +    Config config = new MapConfig(configMap);
 +
 +    /**
 +     * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value.
 +     *
 +     *                                    input1 -> map -> join -> partitionBy (10) -> output1
 +     *                                                       |
 +     *                                     input2 -> filter -|
 +     *                                                       |
 +     *           input3 -> filter -> partitionBy -> map -> join -> output2
 +     *
 +     */
 +    StreamSpec input1 = new StreamSpec("input1", "input1", "system1");
 +    StreamSpec input2 = new StreamSpec("input2", "input2", "system2");
 +    StreamSpec input3 = new StreamSpec("input3", "input3", "system2");
 +
 +    StreamSpec output1 = new StreamSpec("output1", "output1", "system1");
 +    StreamSpec output2 = new StreamSpec("output2", "output2", "system2");
 +
 +    ApplicationRunner runner = mock(ApplicationRunner.class);
 +    when(runner.getStreamSpec("input1")).thenReturn(input1);
 +    when(runner.getStreamSpec("input2")).thenReturn(input2);
 +    when(runner.getStreamSpec("input3")).thenReturn(input3);
 +    when(runner.getStreamSpec("output1")).thenReturn(output1);
 +    when(runner.getStreamSpec("output2")).thenReturn(output2);
 +
 +    // intermediate streams used in tests
 +    StreamSpec int1 = new StreamSpec("test-app-1-partition_by-10", "test-app-1-partition_by-10", "default-system");
 +    StreamSpec int2 = new StreamSpec("test-app-1-partition_by-6", "test-app-1-partition_by-6", "default-system");
 +    when(runner.getStreamSpec("test-app-1-partition_by-10"))
 +        .thenReturn(int1);
 +    when(runner.getStreamSpec("test-app-1-partition_by-6"))
 +        .thenReturn(int2);
 +
 +    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
++    Serde inputSerde = new NoOpSerde<>();
++    MessageStream m1 = streamGraph.getInputStream("input1", inputSerde).map(m -> m);
++    MessageStream m2 = streamGraph.getInputStream("input2", inputSerde).filter(m -> true);
++    MessageStream m3 = streamGraph.getInputStream("input3", inputSerde).filter(m -> true).partitionBy(m -> "hehe", m -> m).map(m -> m);
++    OutputStream<Object> om1 = streamGraph.getOutputStream("output1");
++    OutputStream<Object> om2 = streamGraph.getOutputStream("output2");
++
++    m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).partitionBy(m -> "haha", m -> m).sendTo(om1);
 +    m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(om2);
 +
 +    Multimap<SystemStream, SystemStream> outputToInput = OperatorImplGraph.getIntermediateToInputStreamsMap(streamGraph);
 +    Collection<SystemStream> inputs = outputToInput.get(int1.toSystemStream());
 +    assertEquals(inputs.size(), 2);
 +    assertTrue(inputs.contains(input1.toSystemStream()));
 +    assertTrue(inputs.contains(input2.toSystemStream()));
 +
 +    inputs = outputToInput.get(int2.toSystemStream());
 +    assertEquals(inputs.size(), 1);
 +    assertEquals(inputs.iterator().next(), input3.toSystemStream());
 +  }
 +
 +  @Test
 +  public void testGetProducerTaskCountForIntermediateStreams() {
 +    /**
 +     * the task assignment looks like the following:
 +     *
 +     * input1 -----> task0, task1 -----> int1
 +     *                                    ^
 +     * input2 ------> task1, task2--------|
 +     *                                    v
 +     * input3 ------> task1 -----------> int2
 +     *
 +     */
 +
 +    SystemStream input1 = new SystemStream("system1", "intput1");
 +    SystemStream input2 = new SystemStream("system2", "intput2");
 +    SystemStream input3 = new SystemStream("system2", "intput3");
 +
 +    SystemStream int1 = new SystemStream("system1", "int1");
 +    SystemStream int2 = new SystemStream("system1", "int2");
 +
 +
 +    String task0 = "Task 0";
 +    String task1 = "Task 1";
 +    String task2 = "Task 2";
 +
 +    Multimap<SystemStream, String> streamToConsumerTasks = HashMultimap.create();
 +    streamToConsumerTasks.put(input1, task0);
 +    streamToConsumerTasks.put(input1, task1);
 +    streamToConsumerTasks.put(input2, task1);
 +    streamToConsumerTasks.put(input2, task2);
 +    streamToConsumerTasks.put(input3, task1);
 +    streamToConsumerTasks.put(int1, task0);
 +    streamToConsumerTasks.put(int1, task1);
 +    streamToConsumerTasks.put(int2, task0);
 +
 +    Multimap<SystemStream, SystemStream> intermediateToInputStreams = HashMultimap.create();
 +    intermediateToInputStreams.put(int1, input1);
 +    intermediateToInputStreams.put(int1, input2);
 +
 +    intermediateToInputStreams.put(int2, input2);
 +    intermediateToInputStreams.put(int2, input3);
 +
 +    Map<SystemStream, Integer> counts = OperatorImplGraph.getProducerTaskCountForIntermediateStreams(
 +        streamToConsumerTasks, intermediateToInputStreams);
 +    assertTrue(counts.get(int1) == 3);
 +    assertTrue(counts.get(int2) == 2);
 +  }
  }
  package org.apache.samza.runtime;
  
  import com.google.common.collect.ImmutableList;
- import java.util.*;
 -import java.lang.reflect.Field;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
++import java.util.Set;
  import java.util.stream.Collectors;
  import org.apache.samza.application.StreamApplication;
  import org.apache.samza.config.ApplicationConfig;
  import org.apache.samza.config.JobConfig;
@@@ -49,8 -55,6 +54,7 @@@ import static org.junit.Assert.assertNo
  import static org.mockito.Matchers.anyObject;
  import static org.mockito.Matchers.anyString;
  import static org.mockito.Mockito.*;
- import static org.powermock.api.mockito.PowerMockito.mockStatic;
 +import static org.powermock.api.mockito.PowerMockito.doReturn;
  
  
  @RunWith(PowerMockRunner.class)
@@@ -72,20 -76,44 +76,21 @@@ public class TestLocalApplicationRunne
      StreamApplication app = mock(StreamApplication.class);
      doNothing().when(app).init(anyObject(), anyObject());
  
 -    ExecutionPlanner planner = mock(ExecutionPlanner.class);
 -    Field plannerField = runner.getClass().getSuperclass().getDeclaredField("planner");
 -    plannerField.setAccessible(true);
 -    plannerField.set(runner, planner);
 -
      StreamManager streamManager = mock(StreamManager.class);
 -    Field streamManagerField = runner.getClass().getSuperclass().getDeclaredField("streamManager");
 -    streamManagerField.setAccessible(true);
 -    streamManagerField.set(runner, streamManager);
 -    ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
 +    doReturn(streamManager).when(runner).getStreamManager();
  
 -    ExecutionPlan plan = new ExecutionPlan() {
 -      @Override
 -      public List<JobConfig> getJobConfigs() {
 -        return Collections.emptyList();
 -      }
 -
 -      @Override
 -      public List<StreamSpec> getIntermediateStreams() {
 -        return Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system"));
 -      }
 -
 -      @Override
 -      public String getPlanAsJson()
 -          throws Exception {
 -        return "";
 -      }
 -    };
 -    when(planner.plan(anyObject())).thenReturn(plan);
 +    ExecutionPlan plan = mock(ExecutionPlan.class);
 +    when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system")));
 +    when(plan.getPlanAsJson()).thenReturn("");
 +    doReturn(plan).when(runner).getExecutionPlan(any(), any());
  
-     mockStatic(CoordinationUtilsFactory.class);
      CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class);
-     when(CoordinationUtilsFactory.getCoordinationUtilsFactory(anyObject())).thenReturn(coordinationUtilsFactory);
+     JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class);
+     when(mockJcConfig.getCoordinationUtilsFactory()).thenReturn(coordinationUtilsFactory);
+     PowerMockito.whenNew(JobCoordinatorConfig.class).withAnyArguments().thenReturn(mockJcConfig);
  
 -    LocalApplicationRunner spy = spy(runner);
      try {
 -      spy.run(app);
 +      runner.run(app);
      } catch (Throwable t) {
        assertNotNull(t); //no jobs exception
      }
index 8493cf1,0000000..d2f0184
mode 100644,000000..100644
--- /dev/null
@@@ -1,103 -1,0 +1,112 @@@
-       streamGraph.getInputStream("PageView", (k, v) -> (PageView) v)
-         .partitionBy(PageView::getMemberId)
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.samza.test.controlmessages;
 +
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Random;
 +import org.apache.samza.application.StreamApplication;
 +import org.apache.samza.config.JobConfig;
 +import org.apache.samza.config.JobCoordinatorConfig;
 +import org.apache.samza.config.MapConfig;
 +import org.apache.samza.config.TaskConfig;
 +import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
++import org.apache.samza.operators.KV;
++import org.apache.samza.operators.functions.MapFunction;
 +import org.apache.samza.runtime.LocalApplicationRunner;
 +import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
 +import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 +import org.apache.samza.test.controlmessages.TestData.PageView;
 +import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory;
 +import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 +import org.apache.samza.test.util.ArraySystemFactory;
 +import org.apache.samza.test.util.Base64Serializer;
 +import org.junit.Test;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +
 +/**
 + * This test uses an array as a bounded input source, and does a partitionBy() and sink() after reading the input.
 + * It verifies the pipeline will stop and the number of output messages should equal to the input.
 + */
 +public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness {
 +
 +  private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk", "group", "job"};
 +
 +  @Test
 +  public void testPipeline() throws  Exception {
 +    Random random = new Random();
 +    int count = 10;
 +    PageView[] pageviews = new PageView[count];
 +    for (int i = 0; i < count; i++) {
 +      String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];
 +      int memberId = random.nextInt(10);
 +      pageviews[i] = new PageView(pagekey, memberId);
 +    }
 +
 +    int partitionCount = 4;
 +    Map<String, String> configs = new HashMap<>();
 +    configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());
 +    configs.put("streams.PageView.samza.system", "test");
 +    configs.put("streams.PageView.source", Base64Serializer.serialize(pageviews));
 +    configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
 +
 +    configs.put(JobConfig.JOB_NAME(), "test-eos-job");
 +    configs.put(JobConfig.PROCESSOR_ID(), "1");
 +    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
 +    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
 +    configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
 +
 +    configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
 +    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
 +    configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
 +    configs.put("systems.kafka.samza.key.serde", "int");
 +    configs.put("systems.kafka.samza.msg.serde", "json");
 +    configs.put("systems.kafka.default.stream.replication.factor", "1");
 +    configs.put("job.default.system", "kafka");
 +
 +    configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory");
 +    configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName());
 +
 +    final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs));
 +    List<PageView> received = new ArrayList<>();
 +    final StreamApplication app = (streamGraph, cfg) -> {
-             received.add(m);
++      streamGraph.<KV<String, PageView>>getInputStream("PageView")
++        .map(Values.create())
++        .partitionBy(pv -> pv.getMemberId(), pv -> pv)
 +        .sink((m, collector, coordinator) -> {
++            received.add(m.getValue());
 +          });
 +    };
 +    runner.run(app);
 +    runner.waitForFinish();
 +
 +    assertEquals(received.size(), count * partitionCount);
 +  }
++
++  public static final class Values {
++    public static <K, V, M extends KV<K, V>> MapFunction<M, V> create() {
++      return (M m) -> m.getValue();
++    }
++  }
 +}
index d9202d3,0000000..7da0e77
mode 100644,000000..100644
--- /dev/null
@@@ -1,204 -1,0 +1,206 @@@
-       streamGraph.getInputStream("PageView", (k, v) -> (PageView) v)
-           .partitionBy(PageView::getMemberId)
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.samza.test.controlmessages;
 +
 +import java.lang.reflect.Field;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import org.apache.samza.Partition;
 +import org.apache.samza.application.StreamApplication;
 +import org.apache.samza.config.Config;
 +import org.apache.samza.config.JobConfig;
 +import org.apache.samza.config.JobCoordinatorConfig;
 +import org.apache.samza.config.MapConfig;
 +import org.apache.samza.config.TaskConfig;
 +import org.apache.samza.container.SamzaContainer;
 +import org.apache.samza.container.TaskInstance;
 +import org.apache.samza.container.TaskName;
 +import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
 +import org.apache.samza.metrics.MetricsRegistry;
++import org.apache.samza.operators.KV;
 +import org.apache.samza.operators.impl.InputOperatorImpl;
 +import org.apache.samza.operators.impl.OperatorImpl;
 +import org.apache.samza.operators.impl.OperatorImplGraph;
 +import org.apache.samza.operators.impl.TestOperatorImpl;
 +import org.apache.samza.operators.spec.OperatorSpec;
 +import org.apache.samza.processor.StreamProcessor;
 +import org.apache.samza.processor.TestStreamProcessorUtil;
 +import org.apache.samza.runtime.LocalApplicationRunner;
 +import org.apache.samza.runtime.TestLocalApplicationRunner;
 +import org.apache.samza.serializers.IntegerSerdeFactory;
 +import org.apache.samza.serializers.StringSerdeFactory;
 +import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
 +import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 +import org.apache.samza.system.IncomingMessageEnvelope;
 +import org.apache.samza.system.SystemAdmin;
 +import org.apache.samza.system.SystemConsumer;
 +import org.apache.samza.system.SystemFactory;
 +import org.apache.samza.system.SystemProducer;
 +import org.apache.samza.system.SystemStreamPartition;
 +import org.apache.samza.task.AsyncStreamTaskAdapter;
 +import org.apache.samza.task.StreamOperatorTask;
 +import org.apache.samza.task.TestStreamOperatorTask;
 +import org.apache.samza.test.controlmessages.TestData.PageView;
 +import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory;
 +import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 +import org.apache.samza.test.util.SimpleSystemAdmin;
 +import org.apache.samza.test.util.TestStreamConsumer;
 +import org.junit.Test;
 +import scala.collection.JavaConverters;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +
 +public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness {
 +
 +  private static int offset = 1;
 +  private static final String TEST_SYSTEM = "test";
 +  private static final String TEST_STREAM = "PageView";
 +  private static final int PARTITION_COUNT = 2;
 +  private static final SystemStreamPartition SSP0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
 +  private static final SystemStreamPartition SSP1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
 +
 +  private final static List<IncomingMessageEnvelope> TEST_DATA = new ArrayList<>();
 +  static {
 +    TEST_DATA.add(createIncomingMessage(new PageView("inbox", 1), SSP0));
 +    TEST_DATA.add(createIncomingMessage(new PageView("home", 2), SSP1));
 +    TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP0, 1));
 +    TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP1, 2));
 +    TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP0, 4));
 +    TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP1, 3));
 +    TEST_DATA.add(createIncomingMessage(new PageView("search", 3), SSP0));
 +    TEST_DATA.add(createIncomingMessage(new PageView("pymk", 4), SSP1));
 +    TEST_DATA.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(SSP0));
 +    TEST_DATA.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(SSP1));
 +  }
 +
 +  public final static class TestSystemFactory implements SystemFactory {
 +    @Override
 +    public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
 +      return new TestStreamConsumer(TEST_DATA);
 +    }
 +
 +    @Override
 +    public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
 +      return null;
 +    }
 +
 +    @Override
 +    public SystemAdmin getAdmin(String systemName, Config config) {
 +      return new SimpleSystemAdmin(config);
 +    }
 +  }
 +
 +  private static IncomingMessageEnvelope createIncomingMessage(Object message, SystemStreamPartition ssp) {
 +    return new IncomingMessageEnvelope(ssp, String.valueOf(offset++), "", message);
 +  }
 +
 +  @Test
 +  public void testWatermark() throws Exception {
 +    Map<String, String> configs = new HashMap<>();
 +    configs.put("systems.test.samza.factory", TestSystemFactory.class.getName());
 +    configs.put("streams.PageView.samza.system", "test");
 +    configs.put("streams.PageView.partitionCount", String.valueOf(PARTITION_COUNT));
 +
 +    configs.put(JobConfig.JOB_NAME(), "test-watermark-job");
 +    configs.put(JobConfig.PROCESSOR_ID(), "1");
 +    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
 +    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
 +    configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
 +
 +    configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
 +    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
 +    configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
 +    configs.put("systems.kafka.samza.key.serde", "int");
 +    configs.put("systems.kafka.samza.msg.serde", "json");
 +    configs.put("systems.kafka.default.stream.replication.factor", "1");
 +    configs.put("job.default.system", "kafka");
 +
 +    configs.put("serializers.registry.int.class", IntegerSerdeFactory.class.getName());
 +    configs.put("serializers.registry.string.class", StringSerdeFactory.class.getName());
 +    configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName());
 +
 +    final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs));
 +    List<PageView> received = new ArrayList<>();
 +    final StreamApplication app = (streamGraph, cfg) -> {
-               received.add(m);
++      streamGraph.<KV<String, PageView>>getInputStream("PageView")
++          .map(EndOfStreamIntegrationTest.Values.create())
++          .partitionBy(pv -> pv.getMemberId(), pv -> pv)
 +          .sink((m, collector, coordinator) -> {
++              received.add(m.getValue());
 +            });
 +    };
 +    runner.run(app);
 +    Map<String, StreamOperatorTask> tasks = getTaskOperationGraphs(runner);
 +
 +    runner.waitForFinish();
 +
 +    StreamOperatorTask task0 = tasks.get("Partition 0");
 +    OperatorImplGraph graph = TestStreamOperatorTask.getOperatorImplGraph(task0);
 +    OperatorImpl pb = getOperator(graph, OperatorSpec.OpCode.PARTITION_BY);
 +    assertEquals(TestOperatorImpl.getInputWatermark(pb), 4);
 +    assertEquals(TestOperatorImpl.getOutputWatermark(pb), 4);
 +    OperatorImpl sink = getOperator(graph, OperatorSpec.OpCode.SINK);
 +    assertEquals(TestOperatorImpl.getInputWatermark(sink), 3);
 +    assertEquals(TestOperatorImpl.getOutputWatermark(sink), 3);
 +
 +    StreamOperatorTask task1 = tasks.get("Partition 1");
 +    graph = TestStreamOperatorTask.getOperatorImplGraph(task1);
 +    pb = getOperator(graph, OperatorSpec.OpCode.PARTITION_BY);
 +    assertEquals(TestOperatorImpl.getInputWatermark(pb), 3);
 +    assertEquals(TestOperatorImpl.getOutputWatermark(pb), 3);
 +    sink = getOperator(graph, OperatorSpec.OpCode.SINK);
 +    assertEquals(TestOperatorImpl.getInputWatermark(sink), 3);
 +    assertEquals(TestOperatorImpl.getOutputWatermark(sink), 3);
 +  }
 +
 +  Map<String, StreamOperatorTask> getTaskOperationGraphs(LocalApplicationRunner runner) throws Exception {
 +    StreamProcessor processor = TestLocalApplicationRunner.getProcessors(runner).iterator().next();
 +    SamzaContainer container = TestStreamProcessorUtil.getContainer(processor);
 +    Map<TaskName, TaskInstance> taskInstances = JavaConverters.mapAsJavaMapConverter(container.getTaskInstances()).asJava();
 +    Map<String, StreamOperatorTask> tasks = new HashMap<>();
 +    for (Map.Entry<TaskName, TaskInstance> entry : taskInstances.entrySet()) {
 +      AsyncStreamTaskAdapter adapter = (AsyncStreamTaskAdapter) entry.getValue().task();
 +      Field field = AsyncStreamTaskAdapter.class.getDeclaredField("wrappedTask");
 +      field.setAccessible(true);
 +      StreamOperatorTask task = (StreamOperatorTask) field.get(adapter);
 +      tasks.put(entry.getKey().getTaskName(), task);
 +    }
 +    return tasks;
 +  }
 +
 +  OperatorImpl getOperator(OperatorImplGraph graph, OperatorSpec.OpCode opCode) {
 +    for (InputOperatorImpl input : graph.getAllInputOperators()) {
 +      Set<OperatorImpl> nextOps = TestOperatorImpl.getNextOperators(input);
 +      while (!nextOps.isEmpty()) {
 +        OperatorImpl op = nextOps.iterator().next();
 +        if (TestOperatorImpl.getOpCode(op) == opCode) {
 +          return op;
 +        } else {
 +          nextOps = TestOperatorImpl.getNextOperators(op);
 +        }
 +      }
 +    }
 +    return null;
 +  }
 +}