SAMZA-1277: Add a static merge() operator that takes all streams to merge as input
authorPrateek Maheshwari <pmaheshw@linkedin.com>
Wed, 10 May 2017 22:10:02 +0000 (15:10 -0700)
committerJacob Maes <jmaes@linkedin.com>
Wed, 10 May 2017 22:10:02 +0000 (15:10 -0700)
Also updated documentation for join and partitionBy.

Author: Prateek Maheshwari <pmaheshw@linkedin.com>

Reviewers: Jacob Maes <jmaes@linkedin.com>

Closes #182 from prateekm/documentation-cleanup

samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
samza-core/src/test/java/org/apache/samza/example/MergeExample.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java

index 6d5b784..b081869 100644 (file)
@@ -28,6 +28,7 @@ import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.function.Function;
 
@@ -104,7 +105,7 @@ public interface MessageStream<M> {
    * <p>
    * Use the {@link org.apache.samza.operators.windows.Windows} helper methods to create the appropriate windows.
    * <p>
-   * <b>Note:</b> As of version 0.13.0, messages in windows are kept in memory and will be lost during restarts.
+   * <b>Warning:</b> As of version 0.13.0, messages in windows are kept in memory and will be lost during restarts.
    *
    * @param window the window to group and process messages from this {@link MessageStream}
    * @param <K> the type of key in the message in this {@link MessageStream}. If a key is specified,
@@ -121,7 +122,9 @@ public interface MessageStream<M> {
    * Messages in each stream are retained for the provided {@code ttl} and join results are
    * emitted as matches are found.
    * <p>
-   * <b>Note:</b> As of version 0.13.0, messages in joins are kept in memory and will be lost during restarts.
+   * Both inputs being joined must have the same number of partitions, and should be partitioned by the join key.
+   * <p>
+   * <b>Warning:</b> As of version 0.13.0, messages in joins are kept in memory and will be lost during restarts.
    *
    * @param otherStream the other {@link MessageStream} to be joined with
    * @param joinFn the function to join messages from this and the other {@link MessageStream}
@@ -136,6 +139,8 @@ public interface MessageStream<M> {
 
   /**
    * Merges all {@code otherStreams} with this {@link MessageStream}.
+   * <p>
+   * The merged stream contains messages from all streams in the order they arrive.
    *
    * @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream}
    * @return the merged {@link MessageStream}
@@ -143,11 +148,38 @@ public interface MessageStream<M> {
   MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams);
 
   /**
-   * Sends the messages of type {@code M}in this {@link MessageStream} to a repartitioned output stream and consumes
-   * them as an input {@link MessageStream} again. Uses keys returned by the {@code keyExtractor} as the partition key.
+   * Merges all {@code streams}.
+   * <p>
+   * The merged {@link MessageStream} contains messages from all {@code streams} in the order they arrive.
+   *
+   * @param streams {@link MessageStream}s to be merged
+   * @return the merged {@link MessageStream}
+   * @throws IllegalArgumentException if {@code streams} is empty
+   */
+  static <T> MessageStream<T> mergeAll(Collection<? extends MessageStream<? extends T>> streams) {
+    if (streams.isEmpty()) {
+      throw new IllegalArgumentException("No streams to merge.");
+    }
+    ArrayList<MessageStream<T>> messageStreams = new ArrayList<>((Collection<MessageStream<T>>) streams);
+    MessageStream<T> firstStream = messageStreams.remove(0);
+    return firstStream.merge(messageStreams);
+  }
+
+  /**
+   * Re-partitions this {@link MessageStream} using keys from the {@code keyExtractor} by creating a new
+   * intermediate stream on the {@code job.default.system}. This intermediate stream is both an output and
+   * input to the job.
+   * <p>
+   * The key and message Serdes configured for the default system must be able to serialize and deserialize
+   * types K and M respectively.
    * <p>
-   * <b>Note</b>: Repartitioned streams are created automatically in the default system. The key and message Serdes
-   * configured for the default system must be able to serialize and deserialize types K and M respectively.
+   * The number of partitions for this intermediate stream is determined as follows:
+   * If the stream is an eventual input to a {@link #join}, and the number of partitions for the other stream is known,
+   * then number of partitions for this stream is set to the number of partitions in the other input stream.
+   * Else, the number of partitions is set to the value of the {@code job.intermediate.stream.partitions}
+   * configuration, if present.
+   * Else, the number of partitions is set to to the max of number of partitions for all input and output streams
+   * (excluding intermediate streams).
    *
    * @param keyExtractor the {@link Function} to extract the output message key and partition key from
    *                     the input message
diff --git a/samza-core/src/test/java/org/apache/samza/example/MergeExample.java b/samza-core/src/test/java/org/apache/samza/example/MergeExample.java
new file mode 100644 (file)
index 0000000..9fbf6d1
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.util.CommandLine;
+
+public class MergeExample implements StreamApplication {
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    MessageStream<Object> inputStream1 = graph.getInputStream("inputStream1", (k, m) -> m);
+    MessageStream<Object> inputStream2 = graph.getInputStream("inputStream2", (k, m) -> m);
+    MessageStream<Object> inputStream3 = graph.getInputStream("inputStream3", (k, m) -> m);
+    OutputStream<Integer, Object, Object> outputStream = graph
+        .getOutputStream("outputStream", Object::hashCode, m -> m);
+
+    MessageStream.mergeAll(ImmutableList.of(inputStream1, inputStream2, inputStream3))
+        .sendTo(outputStream);
+  }
+
+  // local execution mode
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
+    localRunner.run(new MergeExample());
+  }
+}
\ No newline at end of file
index 7402b4a..b2a5e2a 100644 (file)
@@ -22,7 +22,11 @@ import com.google.common.collect.ImmutableList;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.operators.data.*;
+import org.apache.samza.operators.data.MessageType;
+import org.apache.samza.operators.data.TestExtOutputMessageEnvelope;
+import org.apache.samza.operators.data.TestInputMessageEnvelope;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
@@ -39,6 +43,7 @@ import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -54,6 +59,8 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 
@@ -276,6 +283,7 @@ public class TestMessageStreamImpl {
         new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph),
         new MessageStreamImpl<TestMessageEnvelope>(mockGraph));
 
+    // should compile
     MessageStream<TestMessageEnvelope> mergeOutput = input1.merge(others);
     validateMergeOperator(input1, mergeOutput);
 
@@ -289,6 +297,8 @@ public class TestMessageStreamImpl {
     MessageStream<MessageEnvelope<T>> ms2 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
     MessageStream<MessageEnvelope<T>> ms3 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
     Collection<MessageStream<MessageEnvelope<T>>> otherStreams = ImmutableList.of(ms2, ms3);
+
+    // should compile
     ms1.merge(otherStreams);
   }
 
@@ -306,6 +316,52 @@ public class TestMessageStreamImpl {
   }
 
   @Test
+  public void testMergeAll() {
+    MessageStream<TestMessageEnvelope> input1 = mock(MessageStreamImpl.class);
+    MessageStream<TestMessageEnvelope> input2 = mock(MessageStreamImpl.class);
+    MessageStream<TestMessageEnvelope> input3 = mock(MessageStreamImpl.class);
+
+    MessageStream.mergeAll(ImmutableList.of(input1, input2, input3));
+
+    ArgumentCaptor<Collection> otherStreamsCaptor = ArgumentCaptor.forClass(Collection.class);
+    verify(input1, times(1)).merge(otherStreamsCaptor.capture());
+    assertEquals(2, otherStreamsCaptor.getValue().size());
+    assertTrue(otherStreamsCaptor.getValue().contains(input2));
+    assertTrue(otherStreamsCaptor.getValue().contains(input3));
+  }
+
+  @Test
+  public void testMergeAllWithRelaxedTypes() {
+    MessageStreamImpl<TestInputMessageEnvelope> input1 = mock(MessageStreamImpl.class);
+    MessageStreamImpl<TestMessageEnvelope> input2 = mock(MessageStreamImpl.class);
+    Collection<MessageStream<? extends TestMessageEnvelope>> streams = ImmutableList.of(input1, input2);
+
+    // should compile
+    MessageStream.mergeAll(streams);
+    ArgumentCaptor<Collection> otherStreamsCaptor = ArgumentCaptor.forClass(Collection.class);
+    verify(input1, times(1)).merge(otherStreamsCaptor.capture());
+    assertEquals(1, otherStreamsCaptor.getValue().size());
+    assertTrue(otherStreamsCaptor.getValue().contains(input2));
+  }
+
+  @Test
+  public <T> void testMergeAllWithNestedTypes() {
+    class MessageEnvelope<TM> { }
+    MessageStream<MessageEnvelope<T>> input1 = mock(MessageStreamImpl.class);
+    MessageStream<MessageEnvelope<T>> input2 = mock(MessageStreamImpl.class);
+    MessageStream<MessageEnvelope<T>> input3 = mock(MessageStreamImpl.class);
+
+    // should compile
+    MessageStream.mergeAll(ImmutableList.of(input1, input2, input3));
+
+    ArgumentCaptor<Collection> otherStreamsCaptor = ArgumentCaptor.forClass(Collection.class);
+    verify(input1, times(1)).merge(otherStreamsCaptor.capture());
+    assertEquals(2, otherStreamsCaptor.getValue().size());
+    assertTrue(otherStreamsCaptor.getValue().contains(input2));
+    assertTrue(otherStreamsCaptor.getValue().contains(input3));
+  }
+
+  @Test
   public void testPartitionBy() {
     Map<String, String> map = new HashMap<>();
     map.put(JobConfig.JOB_DEFAULT_SYSTEM(), "testsystem");