SAMZA-1247: MessageStreamImpl#merge shouldn't mutate input collection
authorPrateek Maheshwari <pmaheshw@linkedin.com>
Thu, 4 May 2017 19:28:58 +0000 (12:28 -0700)
committerJacob Maes <jmaes@linkedin.com>
Thu, 4 May 2017 19:28:58 +0000 (12:28 -0700)
Also fixes
SAMZA-1253: MessageStream.merge operator broken for nested types

Author: Prateek Maheshwari <pmaheshw@linkedin.com>

Reviewers: Jacob Maes <jmaes@linkedin.com>, Jagadish <jvenkatr@linkedin.com>

Closes #159 from prateekm/merge-fixes

samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java

index c406a93..91ef44c 100644 (file)
@@ -127,13 +127,11 @@ public interface MessageStream<M> {
 
   /**
    * Merge all {@code otherStreams} with this {@link MessageStream}.
-   * <p>
-   * The merging streams must have the same messages of type {@code M}.
    *
    * @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream}
    * @return the merged {@link MessageStream}
    */
-  MessageStream<M> merge(Collection<MessageStream<? extends M>> otherStreams);
+  MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams);
 
   /**
    * Sends the messages of type {@code M}in this {@link MessageStream} to a repartitioned output stream and consumes
index b9adeed..4694262 100644 (file)
@@ -38,9 +38,11 @@ import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.TaskContext;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.function.Function;
 
@@ -183,14 +185,15 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
   }
 
   @Override
-  public MessageStream<M> merge(Collection<MessageStream<? extends M>> otherStreams) {
+  public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams) {
     MessageStreamImpl<M> nextStream = new MessageStreamImpl<>(this.graph);
-
-    otherStreams.add(this);
-    otherStreams.forEach(other -> {
-        OperatorSpec mergeOperatorSepc =
+    List<MessageStream<M>> streamsToMerge = new ArrayList<>((Collection<MessageStream<M>>) otherStreams);
+    streamsToMerge.add(this);
+    
+    streamsToMerge.forEach(stream -> {
+        OperatorSpec mergeOperatorSpec =
             OperatorSpecs.createMergeOperatorSpec(nextStream, this.graph.getNextOpId());
-        ((MessageStreamImpl<M>) other).registeredOperatorSpecs.add(mergeOperatorSepc);
+        ((MessageStreamImpl<M>) stream).registeredOperatorSpecs.add(mergeOperatorSpec);
       });
     return nextStream;
   }
index 44870fd..7402b4a 100644 (file)
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.operators;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
@@ -260,16 +261,37 @@ public class TestMessageStreamImpl {
   @Test
   public void testMerge() {
     MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>(mockGraph);
-    Collection<MessageStream<? extends TestMessageEnvelope>> others = new ArrayList<MessageStream<? extends TestMessageEnvelope>>() { {
-        this.add(new MessageStreamImpl<>(mockGraph));
-        this.add(new MessageStreamImpl<>(mockGraph));
-      } };
+    Collection<MessageStream<TestMessageEnvelope>> others = ImmutableList.of(
+        new MessageStreamImpl<>(mockGraph), new MessageStreamImpl<>(mockGraph));
     MessageStream<TestMessageEnvelope> mergeOutput = merge1.merge(others);
     validateMergeOperator(merge1, mergeOutput);
 
+    others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
+  }
+
+  @Test
+  public void testMergeWithRelaxedTypes() {
+    MessageStream<TestMessageEnvelope> input1 = new MessageStreamImpl<>(mockGraph);
+    Collection<MessageStream<? extends TestMessageEnvelope>> others = ImmutableList.of(
+        new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph),
+        new MessageStreamImpl<TestMessageEnvelope>(mockGraph));
+
+    MessageStream<TestMessageEnvelope> mergeOutput = input1.merge(others);
+    validateMergeOperator(input1, mergeOutput);
+
     others.forEach(merge -> validateMergeOperator((MessageStream<TestMessageEnvelope>) merge, mergeOutput));
   }
 
+  @Test
+  public <T> void testMergeWithNestedTypes() {
+    class MessageEnvelope<TM> { }
+    MessageStream<MessageEnvelope<T>> ms1 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
+    MessageStream<MessageEnvelope<T>> ms2 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
+    MessageStream<MessageEnvelope<T>> ms3 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
+    Collection<MessageStream<MessageEnvelope<T>>> otherStreams = ImmutableList.of(ms2, ms3);
+    ms1.merge(otherStreams);
+  }
+
   private void validateMergeOperator(MessageStream<TestMessageEnvelope> mergeSource, MessageStream<TestMessageEnvelope> mergeOutput) {
     Collection<OperatorSpec> subs = ((MessageStreamImpl<TestMessageEnvelope>) mergeSource).getRegisteredOperatorSpecs();
     assertEquals(subs.size(), 1);