SAMZA-1648: Integration Test Framework & Collection Stream Impl
authorsanil15 <sanil.jain15@gmail.com>
Fri, 22 Jun 2018 22:19:50 +0000 (15:19 -0700)
committerxiliu <xiliu@linkedin.com>
Fri, 22 Jun 2018 22:19:50 +0000 (15:19 -0700)
This patch provides the following:
- TestRunner: Tesing Wrapper to run Samza job
- CollectionStream: Acts as a stream descriptor for in memory collections
- CollectionStreamSystem: System associated with a Collection
- StreamUtils: Utilities over streams
- Sample example of tests

Link to SEP: https://cwiki.apache.org/confluence/display/SAMZA/SEP-12%3A+Integration+Test+Framework

Author: sanil15 <sanil.jain15@gmail.com>

Reviewers: Xinyu Liu <xinyu@apache.org>

Closes #501 from Sanil15/SAMZA-1648

samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java [new file with mode: 0644]
samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java [new file with mode: 0644]
samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java [new file with mode: 0644]
samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java [new file with mode: 0644]
samza-test/src/test/java/org/apache/samza/test/framework/MyAsyncStreamTask.java [new file with mode: 0644]
samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java [new file with mode: 0644]
samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java [new file with mode: 0644]
samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java [new file with mode: 0644]

diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
new file mode 100644 (file)
index 0000000..dee10c6
--- /dev/null
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.InMemorySystemConfig;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.operators.KV;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+import org.apache.samza.system.EndOfStreamMessage;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.inmemory.InMemorySystemFactory;
+import org.apache.samza.task.AsyncStreamTask;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.test.framework.stream.CollectionStream;
+import org.apache.samza.test.framework.system.CollectionStreamSystemSpec;
+
+
+/**
+ * TestRunner provides apis to quickly set up tests for Samza low level and high level apis. Default running mode
+ * for test is Single container without any distributed coordination service. Test runner maintains global job config
+ * {@code configs} that are used to run the Samza job
+ *
+ * For single container mode following configs are set by default
+ *  <ol>
+ *    <li>"job.coordination.utils.factory" = {@link PassthroughCoordinationUtilsFactory}</li>
+ *    <li>"job.coordination.factory" = {@link PassthroughJobCoordinatorFactory}</li>
+ *    <li>"task.name.grouper.factory" = {@link SingleContainerGrouperFactory}</li>
+ *    <li>"job.name" = "test-samza"</li>
+ *    <li>"processor.id" = "1"</li>
+ *    <li>"inmemory.scope = " Scope id generated to isolate the run for InMemorySystem</li>
+ *  </ol>
+ *
+ */
+public class TestRunner {
+
+  private static final String JOB_NAME = "test-samza";
+  public enum Mode {
+    SINGLE_CONTAINER, MULTI_CONTAINER
+  }
+
+  private Map<String, String> configs;
+  private Map<String, CollectionStreamSystemSpec> systems;
+  private Class taskClass;
+  private StreamApplication app;
+  private String testId;
+  private SystemFactory factory;
+
+  /**
+   * Mode defines single or multi container running configuration, by default a single container configuration is assumed
+   */
+  private Mode mode;
+
+  private TestRunner() {
+    this.testId = RandomStringUtils.random(10, true, true);
+    this.systems = new HashMap<String, CollectionStreamSystemSpec>();
+    this.configs = new HashMap<>();
+    this.mode = Mode.SINGLE_CONTAINER;
+    this.factory = new InMemorySystemFactory();
+    configs.put(InMemorySystemConfig.INMEMORY_SCOPE, testId);
+    configs.put(JobConfig.JOB_NAME(), JOB_NAME);
+    configs.putIfAbsent(JobConfig.PROCESSOR_ID(), "1");
+    configs.putIfAbsent(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
+    configs.putIfAbsent(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
+    configs.putIfAbsent(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
+  }
+
+  /**
+   * Constructs a new {@link TestRunner} from following components
+   * @param taskClass represent a class containing Samza job logic extending either {@link StreamTask} or {@link AsyncStreamTask}
+   */
+  private TestRunner(Class taskClass) {
+    this();
+    Preconditions.checkNotNull(taskClass);
+    configs.put(TaskConfig.TASK_CLASS(), taskClass.getName());
+    this.taskClass = taskClass;
+  }
+
+  /**
+   * Constructs a new {@link TestRunner} from following components
+   * @param app represent a class containing Samza job logic implementing {@link StreamApplication}
+   */
+  private TestRunner(StreamApplication app) {
+    this();
+    Preconditions.checkNotNull(app);
+    this.app = app;
+  }
+
+  /**
+   * Registers a system with TestRunner if not already registered and configures all the system configs to global
+   * job configs
+   */
+  private void registerSystem(String systemName) {
+    if (!systems.containsKey(systemName)) {
+      systems.put(systemName, CollectionStreamSystemSpec.create(systemName));
+      configs.putAll(systems.get(systemName).getSystemConfigs());
+    }
+  }
+
+  /**
+   * Creates an instance of {@link TestRunner} for Low Level Samza Api
+   * @param taskClass represent a class extending either {@link StreamTask} or {@link AsyncStreamTask}
+   * @return a {@link TestRunner} for {@code taskClass}
+   */
+  public static TestRunner of(Class taskClass) {
+    Preconditions.checkNotNull(taskClass);
+    Preconditions.checkState(
+        StreamTask.class.isAssignableFrom(taskClass) || AsyncStreamTask.class.isAssignableFrom(taskClass));
+    return new TestRunner(taskClass);
+  }
+
+  /**
+   * Creates an instance of {@link TestRunner} for High Level/Fluent Samza Api
+   * @param app represent a class representing Samza job by implementing {@link StreamApplication}
+   * @return a {@link TestRunner} for {@code app}
+   */
+  public static TestRunner of(StreamApplication app) {
+    Preconditions.checkNotNull(app);
+    return new TestRunner(app);
+  }
+
+  /**
+   * Only adds a config from {@code config} to global {@code configs} if they dont exist in it.
+   * @param config represents the {@link Config} supposed to be added to global configs
+   * @return calling instance of {@link TestRunner} with added configs if they don't exist
+   */
+  public TestRunner addConfigs(Config config) {
+    Preconditions.checkNotNull(config);
+    config.forEach(this.configs::putIfAbsent);
+    return this;
+  }
+
+  /**
+   * Adds a config to {@code configs} if its not already present. Overrides a config value for which key is already
+   * exisiting in {@code configs}
+   * @param key key of the config
+   * @param value value of the config
+   * @return calling instance of {@link TestRunner} with added config
+   */
+  public TestRunner addOverrideConfig(String key, String value) {
+    Preconditions.checkNotNull(key);
+    Preconditions.checkNotNull(value);
+    configs.put(key, value);
+    return this;
+  }
+
+  /**
+   * Configures {@code stream} with the TestRunner, adds all the stream specific configs to global job configs.
+   * <p>
+   * Every stream belongs to a System (here a {@link CollectionStreamSystemSpec}), this utility also registers the system with
+   * {@link TestRunner} if not registered already. Then it creates and initializes the stream partitions with messages for
+   * the registered System
+   * <p>
+   * @param stream represents the stream that is supposed to be configured with {@link TestRunner}
+   * @return calling instance of {@link TestRunner} with {@code stream} configured with it
+   */
+  public TestRunner addInputStream(CollectionStream stream) {
+    Preconditions.checkNotNull(stream);
+    registerSystem(stream.getSystemName());
+    initializeInput(stream);
+    stream.setTestId(testId);
+    if (configs.containsKey(TaskConfig.INPUT_STREAMS())) {
+      configs.put(TaskConfig.INPUT_STREAMS(),
+          configs.get(TaskConfig.INPUT_STREAMS()).concat("," + stream.getSystemName() + "." + stream.getPhysicalName()));
+    } else {
+      configs.put(TaskConfig.INPUT_STREAMS(), stream.getSystemName() + "." + stream.getPhysicalName());
+    }
+    stream.getStreamConfig().forEach((key, val) -> {
+        configs.putIfAbsent((String) key, (String) val);
+      });
+
+    return this;
+  }
+
+  /**
+   * Creates an in memory stream with {@link InMemorySystemFactory} and initializes the metadata for the stream.
+   * Initializes each partition of that stream with messages from {@code stream.getInitPartitions}
+   *
+   * @param stream represents the stream to initialize with the in memory system
+   * @param <T> can represent a message or a KV {@link org.apache.samza.operators.KV}, key of which represents key of a
+   *            {@link org.apache.samza.system.IncomingMessageEnvelope} or {@link org.apache.samza.system.OutgoingMessageEnvelope}
+   *            and value represents the message
+   */
+  private <T> void initializeInput(CollectionStream stream) {
+    Preconditions.checkNotNull(stream);
+    Preconditions.checkState(stream.getInitPartitions().size() >= 1);
+    String streamName = stream.getStreamName();
+    String systemName = stream.getSystemName();
+    Map<Integer, Iterable<T>> partitions = stream.getInitPartitions();
+    StreamSpec spec = new StreamSpec(streamName, stream.getPhysicalName(), systemName, partitions.size());
+    factory.getAdmin(systemName, new MapConfig(configs)).createStream(spec);
+    SystemProducer producer = factory.getProducer(systemName, new MapConfig(configs), null);
+    partitions.forEach((partitionId, partition) -> {
+        partition.forEach(e -> {
+            Object key = e instanceof KV ? ((KV) e).getKey() : null;
+            Object value = e instanceof KV ? ((KV) e).getValue() : e;
+            producer.send(systemName,
+                new OutgoingMessageEnvelope(new SystemStream(systemName, stream.getPhysicalName()), Integer.valueOf(partitionId), key,
+                    value));
+          });
+        producer.send(systemName,
+            new OutgoingMessageEnvelope(new SystemStream(systemName, stream.getPhysicalName()), Integer.valueOf(partitionId), null,
+                new EndOfStreamMessage(null)));
+      });
+  }
+
+  /**
+   * Configures {@code stream} with the TestRunner, adds all the stream specific configs to global job configs.
+   * <p>
+   * Every stream belongs to a System (here a {@link CollectionStreamSystemSpec}), this utility also registers the system with
+   * {@link TestRunner} if not registered already. Then it creates the stream partitions with the registered System
+   * <p>
+   * @param stream represents the stream that is supposed to be configured with {@link TestRunner}
+   * @return calling instance of {@link TestRunner} with {@code stream} configured with it
+   */
+  public TestRunner addOutputStream(CollectionStream stream) {
+    Preconditions.checkNotNull(stream);
+    Preconditions.checkState(stream.getInitPartitions().size() >= 1);
+    registerSystem(stream.getSystemName());
+    stream.setTestId(testId);
+    StreamSpec spec = new StreamSpec(stream.getStreamName(), stream.getPhysicalName(), stream.getSystemName(), stream.getInitPartitions().size());
+    factory
+        .getAdmin(stream.getSystemName(), new MapConfig(configs))
+        .createStream(spec);
+    configs.putAll(stream.getStreamConfig());
+    return this;
+  }
+
+  /**
+   * Utility to run a test configured using TestRunner
+   */
+  public void run() {
+    Preconditions.checkState((app == null && taskClass != null) || (app != null && taskClass == null),
+        "TestRunner should run for Low Level Task api or High Level Application Api");
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs));
+    if (app == null) {
+      runner.runTask();
+      runner.waitForFinish();
+    } else {
+      runner.run(app);
+      runner.waitForFinish();
+    }
+  }
+
+  /**
+   * Utility to read the messages from a stream from the beginning, this is supposed to be used after executing the
+   * TestRunner in order to assert over the streams (ex output streams).
+   *
+   * @param stream represents {@link CollectionStream} whose current state of partitions is requested to be fetched
+   * @param timeout poll timeout in Ms
+   * @param <T> represents type of message
+   *
+   * @return a map key of which represents the {@code partitionId} and value represents the current state of the partition
+   *         i.e messages in the partition
+   * @throws InterruptedException Thrown when a blocking poll has been interrupted by another thread.
+   */
+  public static <T> Map<Integer, List<T>> consumeStream(CollectionStream stream, Integer timeout) throws InterruptedException {
+    Preconditions.checkNotNull(stream);
+    Preconditions.checkNotNull(stream.getSystemName());
+    String streamName = stream.getStreamName();
+    String systemName = stream.getSystemName();
+    Set<SystemStreamPartition> ssps = new HashSet<>();
+    Set<String> streamNames = new HashSet<>();
+    streamNames.add(streamName);
+    SystemFactory factory = new InMemorySystemFactory();
+    HashMap<String, String> config = new HashMap<>();
+    config.put(InMemorySystemConfig.INMEMORY_SCOPE, stream.getTestId());
+    Map<String, SystemStreamMetadata> metadata =
+        factory.getAdmin(systemName, new MapConfig(config)).getSystemStreamMetadata(streamNames);
+    SystemConsumer consumer = factory.getConsumer(systemName, new MapConfig(config), null);
+    metadata.get(stream.getPhysicalName()).getSystemStreamPartitionMetadata().keySet().forEach(partition -> {
+        SystemStreamPartition temp = new SystemStreamPartition(systemName, streamName, partition);
+        ssps.add(temp);
+        consumer.register(temp, "0");
+      });
+
+    long t = System.currentTimeMillis();
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> output = new HashMap<>();
+    HashSet<SystemStreamPartition> didNotReachEndOfStream = new HashSet<>(ssps);
+    while (System.currentTimeMillis() < t + timeout) {
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> currentState = consumer.poll(ssps, 10);
+      for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> entry : currentState.entrySet()) {
+        SystemStreamPartition ssp = entry.getKey();
+        output.computeIfAbsent(ssp, k -> new LinkedList<IncomingMessageEnvelope>());
+        List<IncomingMessageEnvelope> currentBuffer = entry.getValue();
+        Integer totalMessagesToFetch = Integer.valueOf(metadata.get(stream.getStreamName())
+            .getSystemStreamPartitionMetadata()
+            .get(ssp.getPartition())
+            .getNewestOffset());
+        if (output.get(ssp).size() + currentBuffer.size() == totalMessagesToFetch) {
+          didNotReachEndOfStream.remove(entry.getKey());
+          ssps.remove(entry.getKey());
+        }
+        output.get(ssp).addAll(currentBuffer);
+      }
+      if (didNotReachEndOfStream.isEmpty()) {
+        break;
+      }
+    }
+
+    if (!didNotReachEndOfStream.isEmpty()) {
+      throw new IllegalStateException("Could not poll for all system stream partitions");
+    }
+
+    return output.entrySet()
+        .stream()
+        .collect(Collectors.toMap(entry -> entry.getKey().getPartition().getPartitionId(),
+            entry -> entry.getValue().stream().map(e -> (T) e.getMessage()).collect(Collectors.toList())));
+  }
+}
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java b/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java
new file mode 100644 (file)
index 0000000..b3d9485
--- /dev/null
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework.stream;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A CollectionStream represents an in memory stream of messages that can either have single or multiple partitions.
+ * Every CollectionStream is coupled with a {@link org.apache.samza.test.framework.system.CollectionStreamSystemSpec} that
+ * contains all the specification for system
+ *<p>
+ * When sending messages using {@code CollectionStream<KV<K, V>>}, messages use K as key and V as message
+ * When sending messages using {@code CollectionStream<T>}, messages use a nullkey.
+ *</p>
+ * @param <T>
+ *        can represent a message with null key or a KV {@link org.apache.samza.operators.KV}, key of which represents key of a
+ *        {@link org.apache.samza.system.IncomingMessageEnvelope} or {@link org.apache.samza.system.OutgoingMessageEnvelope}
+ *        and value represents the message of the same
+ */
+public class CollectionStream<T> {
+  private String testId;
+  private final String streamName;
+  private final String physicalName;
+  private final String systemName;
+  private Map<Integer, Iterable<T>> initPartitions;
+  private Map<String, String> streamConfig;
+  private static final String STREAM_TO_SYSTEM = "streams.%s.samza.system";
+  private static final String PHYSICAL_NAME = "streams.%s.samza.physical.name";
+
+  /**
+   * Constructs a new CollectionStream from specified components.
+   * @param systemName represents name of the system stream is associated with
+   * @param streamName represents name of the stream
+   */
+  private CollectionStream(String systemName, String streamName) {
+    Preconditions.checkNotNull(systemName);
+    Preconditions.checkNotNull(streamName);
+    this.systemName = systemName;
+    this.streamName = streamName;
+    this.streamConfig = new HashMap<>();
+    // TODO: Once SAMZA-1737 is resolved, generate a randomized physical name
+    this.physicalName = streamName;
+    streamConfig.put(String.format(STREAM_TO_SYSTEM, this.streamName), systemName);
+    streamConfig.put(String.format(PHYSICAL_NAME, this.streamName), physicalName);
+  }
+
+
+  /**
+   * Constructs a new CollectionStream with multiple empty partitions from specified components.
+   * @param systemName represents name of the system stream is associated with
+   * @param streamName represents name of the stream
+   * @param partitionCount represents number of partitions, each of these partitions will be empty
+   */
+  private CollectionStream(String systemName, String streamName, Integer partitionCount) {
+    this(systemName, streamName);
+    Preconditions.checkState(partitionCount > 0);
+    initPartitions = new HashMap<>();
+    for (int i = 0; i < partitionCount; i++) {
+      initPartitions.put(i, new ArrayList<>());
+    }
+  }
+
+  /**
+   * Constructs a new CollectionStream with single partition from specified components.
+   * @param systemName represents name of the system stream is associated with
+   * @param streamName represents name of the stream
+   * @param initPartition represents the messages that the stream will be intialized with, default partitionId for the
+   *                  this single partition stream is 0
+   */
+  private CollectionStream(String systemName, String streamName, Iterable<T> initPartition) {
+    this(systemName, streamName);
+    Preconditions.checkNotNull(initPartition);
+    initPartitions = new HashMap<>();
+    initPartitions.put(0, initPartition);
+  }
+
+  /**
+   * Constructs a new CollectionStream with multiple partitions from specified components.
+   * @param systemName represents name of the system stream is associated with
+   * @param streamName represents name of the stream
+   * @param initPartitions represents the partition state, key of the map represents partitionId and value represents
+   *                   the messages that partition will be initialized with
+   */
+  private CollectionStream(String systemName, String streamName, Map<Integer, ? extends Iterable<T>> initPartitions) {
+    this(systemName, streamName);
+    Preconditions.checkNotNull(initPartitions);
+    initPartitions = new HashMap<>(initPartitions);
+  }
+
+  /**
+   * @return The Map of partitions that input stream is supposed to be initialized with, this method is
+   * used internally and should not be used for asserting over streams.
+   * The true state of stream is determined by {@code consmeStream()} of {@link org.apache.samza.test.framework.TestRunner}
+   */
+  public Map<Integer, Iterable<T>> getInitPartitions() {
+    return initPartitions;
+  }
+
+  public String getStreamName() {
+    return streamName;
+  }
+
+  public String getSystemName() {
+    return systemName;
+  }
+
+  public Map<String, String> getStreamConfig() {
+    return streamConfig;
+  }
+
+  public String getTestId() {
+    return testId;
+  }
+
+  public void setTestId(String testId) {
+    this.testId = testId;
+  }
+
+  public String getPhysicalName() {
+    return physicalName;
+  }
+
+  /**
+   * Creates an in memory stream with the name {@code streamName} and initializes the stream to only one partition
+   *
+   * @param systemName represents name of the system stream is associated with
+   * @param streamName represents the name of the Stream
+   * @param <T> represents the type of each message in a stream
+   * @return an {@link CollectionStream} with only one partition that can contain messages of the type
+   */
+  public static <T> CollectionStream<T> empty(String systemName, String streamName) {
+    return new CollectionStream<>(systemName, streamName, 1);
+  }
+
+  /**
+   * Creates an in memory stream with the name {@code streamName} and initializes the stream to have as many partitions
+   * as specified by {@code partitionCount}. These partitions are empty and are supposed to be used by Samza job to produce
+   * messages to.
+   *
+   * @param systemName represents name of the system stream is associated with
+   * @param streamName represents the name of the Stream
+   * @param partitionCount represents the number of partitions the stream would have
+   * @param <T> represents the type of each message in a stream
+   * @return an empty {@link CollectionStream} with multiple partitions that can contain messages of the type {@code T}
+   */
+  public static <T> CollectionStream<T> empty(String systemName, String streamName, int partitionCount) {
+    return new CollectionStream<>(systemName, streamName, partitionCount);
+  }
+
+  /**
+   * Creates an in memory stream with the name {@code streamName}. Stream is created with single partition having
+   * {@code partitionId} is 0. This partition is intialzied with messages of type T
+   *
+   * @param systemName represents name of the system stream is associated with
+   * @param streamName represents the name of the Stream
+   * @param partition represents the messages that the {@link org.apache.samza.system.SystemStreamPartition} will be
+   *                  initialized with
+   * @param <T> represents the type of a message in the stream
+   * @return a {@link CollectionStream} with only one partition containing messages of the type {@code T}
+   *
+   */
+  public static <T> CollectionStream<T> of(String systemName, String streamName, Iterable<T> partition) {
+    return new CollectionStream<>(systemName, streamName, partition);
+  }
+
+  /**
+   * Creates an in memory stream with the name {@code streamName} and initializes the stream to have as many partitions
+   * as the size of {@code partitions} map. Key of the map {@code partitions} represents the {@code partitionId} of
+   * each {@link org.apache.samza.Partition} for a {@link org.apache.samza.system.SystemStreamPartition} and value is
+   * an Iterable of messages that the {@link org.apache.samza.system.SystemStreamPartition} should be initialized with.
+   *
+   * @param systemName represents name of the system stream is associated with
+   * @param streamName represents the name of the Stream
+   * @param partitions Key of an entry in partitions represents a {@code partitionId} of a {@link org.apache.samza.Partition}
+   *                   and value represents the stream of messages the {@link org.apache.samza.system.SystemStreamPartition}
+   *                   will be initialized with
+   * @param <T> represents the type of a message in the stream
+   * @return a {@link CollectionStream} with multiple partitions each containing messages of the type {@code T}
+   *
+   */
+  public static <T> CollectionStream<T> of(String systemName, String streamName, Map<Integer, ? extends Iterable<T>> partitions) {
+    return new CollectionStream<>(systemName, streamName, partitions);
+  }
+}
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java
new file mode 100644 (file)
index 0000000..c005c41
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework.system;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.system.inmemory.InMemorySystemFactory;
+
+
+/**
+ * CollectionStreamSystem represents a system that interacts with an underlying {@link InMemorySystemFactory} to create
+ * various input and output streams and initialize {@link org.apache.samza.system.SystemStreamPartition} with messages
+ * <p>
+ * Following system level configs are set by default
+ * <ol>
+ *   <li>"systems.%s.default.stream.samza.offset.default" = "oldest"</li>
+ *   <li>"systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
+ * </ol>
+ *
+ */
+public class CollectionStreamSystemSpec {
+  private static final String SYSTEM_FACTORY = "systems.%s.samza.factory";
+  private static final String SYSTEM_OFFSET = "systems.%s.default.stream.samza.offset.default";
+
+  private String systemName;
+  private Map<String, String> systemConfigs;
+
+  /**
+   * Constructs a new CollectionStreamSystem from specified components.
+   * <p>
+   * Every {@link CollectionStreamSystemSpec} is assumed to consume from the oldest offset, since stream is in memory and
+   * is used for testing purpose. System uses {@link InMemorySystemFactory} to initialize in memory streams.
+   * <p>
+   * @param systemName represents unique name of the system
+   */
+  private CollectionStreamSystemSpec(String systemName) {
+    this.systemName = systemName;
+    systemConfigs = new HashMap<String, String>();
+    systemConfigs.put(String.format(SYSTEM_FACTORY, systemName), InMemorySystemFactory.class.getName());
+    systemConfigs.put(String.format(SYSTEM_OFFSET, systemName), "oldest");
+  }
+
+  public String getSystemName() {
+    return systemName;
+  }
+
+  public Map<String, String> getSystemConfigs() {
+    return systemConfigs;
+  }
+
+  /**
+   * Creates a {@link CollectionStreamSystemSpec} with name {@code name}
+   * @param name represents name of the {@link CollectionStreamSystemSpec}
+   * @return an instance of {@link CollectionStreamSystemSpec}
+   */
+  public static CollectionStreamSystemSpec create(String name) {
+    Preconditions.checkState(name != null);
+    return new CollectionStreamSystemSpec(name);
+  }
+}
+
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
new file mode 100644 (file)
index 0000000..c991b8c
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.samza.test.framework.stream.CollectionStream;
+import org.hamcrest.collection.IsIterableContainingInOrder;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class AsyncStreamTaskIntegrationTest {
+
+  @Test
+  public void testAsyncTaskWithSinglePartition() throws Exception {
+    List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5);
+    List<Integer> outputList = Arrays.asList(10, 20, 30, 40, 50);
+
+    CollectionStream<Integer> input = CollectionStream.of("async-test", "ints", inputList);
+    CollectionStream output = CollectionStream.empty("async-test", "ints-out");
+
+    TestRunner
+        .of(MyAsyncStreamTask.class)
+        .addInputStream(input)
+        .addOutputStream(output)
+        .run();
+
+    Assert.assertThat(TestRunner.consumeStream(output, 1000).get(0),
+        IsIterableContainingInOrder.contains(outputList.toArray()));
+  }
+}
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/MyAsyncStreamTask.java b/samza-test/src/test/java/org/apache/samza/test/framework/MyAsyncStreamTask.java
new file mode 100644 (file)
index 0000000..347e766
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework;
+
+import java.util.Random;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.AsyncStreamTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCoordinator;
+
+
+public class MyAsyncStreamTask implements AsyncStreamTask {
+  @Override
+  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator,
+      final TaskCallback callback) {
+    // Mimic a random callback delay ans send message
+    RestCall call = new RestCall(envelope, collector, callback);
+    call.start();
+  }
+}
+
+class RestCall extends Thread {
+  static Random random = new Random();
+  IncomingMessageEnvelope envelope;
+  MessageCollector messageCollector;
+  TaskCallback callback;
+
+  RestCall(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCallback callback) {
+    this.envelope = envelope;
+    this.callback = callback;
+    this.messageCollector = collector;
+  }
+
+  @Override
+  public void run() {
+    try {
+      // Let the thread sleep for a while.
+      Thread.sleep(random.nextInt(150));
+    } catch (InterruptedException e) {
+      System.out.println("Thread " + this.getName() + " interrupted.");
+    }
+    Integer obj = (Integer) envelope.getMessage();
+    messageCollector.send(new OutgoingMessageEnvelope(new SystemStream("async-test", "ints-out"), obj * 10));
+    callback.complete();
+  }
+}
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java b/samza-test/src/test/java/org/apache/samza/test/framework/MyStreamTestTask.java
new file mode 100644 (file)
index 0000000..c83e461
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskCoordinator;
+
+
+public class MyStreamTestTask implements StreamTask {
+  @Override
+  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)
+      throws Exception {
+    Integer obj = (Integer) envelope.getMessage();
+    collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "output"), obj * 10));
+  }
+}
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
new file mode 100644 (file)
index 0000000..307c1b5
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.framework;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.test.framework.stream.CollectionStream;
+import static org.apache.samza.test.controlmessages.TestData.PageView;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class StreamApplicationIntegrationTest {
+
+  final StreamApplication app = (streamGraph, cfg) -> {
+    streamGraph.<KV<String, PageView>>getInputStream("PageView")
+        .map(Values.create())
+        .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1")
+        .sink((m, collector, coordinator) -> {
+            collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "Output"),
+                m.getKey(), m.getKey(),
+                m));
+          });
+  };
+
+  private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk", "group", "job"};
+
+  @Test
+  public void testHighLevelApi() throws Exception {
+    Random random = new Random();
+    int count = 10;
+    List<PageView> pageviews = new ArrayList<>(count);
+    for (int i = 0; i < count; i++) {
+      String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];
+      int memberId = i;
+      pageviews.add(new PageView(pagekey, memberId));
+    }
+
+    CollectionStream<PageView> input = CollectionStream.of("test", "PageView", pageviews);
+    CollectionStream output = CollectionStream.empty("test", "Output", 10);
+
+    TestRunner
+        .of(app)
+        .addInputStream(input)
+        .addOutputStream(output)
+        .addOverrideConfig("job.default.system", "test")
+        .run();
+
+    Assert.assertEquals(TestRunner.consumeStream(output, 10000).get(random.nextInt(count)).size(), 1);
+  }
+
+  public static final class Values {
+    public static <K, V, M extends KV<K, V>> MapFunction<M, V> create() {
+      return (M m) -> m.getValue();
+    }
+  }
+}
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
new file mode 100644 (file)
index 0000000..e052539
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.samza.test.framework.stream.CollectionStream;
+import org.hamcrest.collection.IsIterableContainingInOrder;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StreamTaskIntegrationTest {
+
+  @Test
+  public void testSyncTaskWithSinglePartition() throws Exception {
+    List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5);
+    List<Integer> outputList = Arrays.asList(10, 20, 30, 40, 50);
+
+    CollectionStream<Integer> input = CollectionStream.of("test", "input", inputList);
+    CollectionStream output = CollectionStream.empty("test", "output");
+
+    TestRunner.of(MyStreamTestTask.class).addInputStream(input).addOutputStream(output).run();
+
+    Assert.assertThat(TestRunner.consumeStream(output, 1000).get(0),
+        IsIterableContainingInOrder.contains(outputList.toArray()));
+  }
+
+}