SAMZA-1758: Configuring a timeout for TestRunner to execute the SamzaJob
authorsanil15 <sanil.jain15@gmail.com>
Thu, 28 Jun 2018 22:48:28 +0000 (15:48 -0700)
committerBoris S <bshkolnik@linkedin.com>
Thu, 28 Jun 2018 22:48:28 +0000 (15:48 -0700)
Author: sanil15 <sanil.jain15@gmail.com>
Author: Sanil Jain <snjain@linkedin.com>

Reviewers: Bharath Kumarasubramanian <codin.martial@gmail.com>

Closes #563 from Sanil15/SAMZA-1758 and squashes the following commits:

5b198a0f [Sanil Jain] Fixing a bug for preconditions
6d2fd334 [Sanil Jain] Adressing Review
1eb3847c [Sanil Jain] Removing explicit handling of TimeoutException and adding more docs
0a9689c9 [sanil15] Addressing Review, moving tests from SamzaFailureTests, improving doc, adding validation
b79b5628 [sanil15] Using ExceptionUtils to get full stack trace, adding more docs
dd816ff8 [sanil15] Addressing review, using waitForFinish(timeout) to configure a timeout for TestRunner, adding some Failure tests
903c1162 [sanil15] Configuring a timeout for TestRunner to execute the SamzaJob

samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java

index dee10c6..6e647d9 100644 (file)
@@ -20,6 +20,7 @@
 package org.apache.samza.test.framework;
 
 import com.google.common.base.Preconditions;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -28,6 +29,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.InMemorySystemConfig;
@@ -36,6 +39,7 @@ import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.operators.KV;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
@@ -55,6 +59,7 @@ import org.apache.samza.task.AsyncStreamTask;
 import org.apache.samza.task.StreamTask;
 import org.apache.samza.test.framework.stream.CollectionStream;
 import org.apache.samza.test.framework.system.CollectionStreamSystemSpec;
+import org.junit.Assert;
 
 
 /**
@@ -267,22 +272,33 @@ public class TestRunner {
     return this;
   }
 
+
   /**
    * Utility to run a test configured using TestRunner
+   *
+   * @param timeout time to wait for the high level application or low level task to finish. This timeout does not include
+   *                input stream initialization time or the assertion time over output streams. This timeout just accounts
+   *                for time that samza job takes run. Samza job won't be invoked with negative or zero timeout
+   * @throws SamzaException if Samza job fails with exception and returns UnsuccessfulFinish as the statuscode
    */
-  public void run() {
+  public void run(Duration timeout) {
     Preconditions.checkState((app == null && taskClass != null) || (app != null && taskClass == null),
         "TestRunner should run for Low Level Task api or High Level Application Api");
+    Preconditions.checkState(!timeout.isZero() || !timeout.isNegative(),
+        "Timeouts should be positive");
     final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs));
     if (app == null) {
       runner.runTask();
-      runner.waitForFinish();
     } else {
       runner.run(app);
-      runner.waitForFinish();
+    }
+    boolean timedOut = !runner.waitForFinish(timeout);
+    Assert.assertFalse("Timed out waiting for application to finish", timedOut);
+    ApplicationStatus status = runner.status(app);
+    if (status.getStatusCode() == ApplicationStatus.StatusCode.UnsuccessfulFinish) {
+      throw new SamzaException(ExceptionUtils.getStackTrace(status.getThrowable()));
     }
   }
-
   /**
    * Utility to read the messages from a stream from the beginning, this is supposed to be used after executing the
    * TestRunner in order to assert over the streams (ex output streams).
index c991b8c..ad25cae 100644 (file)
@@ -19,6 +19,7 @@
 
 package org.apache.samza.test.framework;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.samza.test.framework.stream.CollectionStream;
@@ -41,9 +42,26 @@ public class AsyncStreamTaskIntegrationTest {
         .of(MyAsyncStreamTask.class)
         .addInputStream(input)
         .addOutputStream(output)
-        .run();
+        .run(Duration.ofSeconds(2));
 
     Assert.assertThat(TestRunner.consumeStream(output, 1000).get(0),
         IsIterableContainingInOrder.contains(outputList.toArray()));
   }
+
+  /**
+   * Job should fail because it times out too soon
+   */
+  @Test(expected = AssertionError.class)
+  public void testSamzaJobTimeoutFailureForAsyncTask() {
+    List<Integer> inputList = Arrays.asList(1, 2, 3, 4);
+
+    CollectionStream<Integer> input = CollectionStream.of("async-test", "ints", inputList);
+    CollectionStream output = CollectionStream.empty("async-test", "ints-out");
+
+    TestRunner
+        .of(MyAsyncStreamTask.class)
+        .addInputStream(input)
+        .addOutputStream(output)
+        .run(Duration.ofMillis(1));
+  }
 }
index 307c1b5..8ac40e1 100644 (file)
  */
 package org.apache.samza.test.framework;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.test.controlmessages.TestData;
 import org.apache.samza.test.framework.stream.CollectionStream;
 import static org.apache.samza.test.controlmessages.TestData.PageView;
 import org.junit.Assert;
@@ -34,7 +37,12 @@ import org.junit.Test;
 
 public class StreamApplicationIntegrationTest {
 
-  final StreamApplication app = (streamGraph, cfg) -> {
+  final StreamApplication pageViewFilter = (streamGraph, cfg) -> {
+    streamGraph.<KV<String, TestData.PageView>>getInputStream("PageView").map(
+        StreamApplicationIntegrationTest.Values.create()).filter(pv -> pv.getPageKey().equals("inbox"));
+  };
+
+  final StreamApplication pageViewParition = (streamGraph, cfg) -> {
     streamGraph.<KV<String, PageView>>getInputStream("PageView")
         .map(Values.create())
         .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1")
@@ -62,11 +70,11 @@ public class StreamApplicationIntegrationTest {
     CollectionStream output = CollectionStream.empty("test", "Output", 10);
 
     TestRunner
-        .of(app)
+        .of(pageViewParition)
         .addInputStream(input)
         .addOutputStream(output)
         .addOverrideConfig("job.default.system", "test")
-        .run();
+        .run(Duration.ofMillis(1500));
 
     Assert.assertEquals(TestRunner.consumeStream(output, 10000).get(random.nextInt(count)).size(), 1);
   }
@@ -76,4 +84,44 @@ public class StreamApplicationIntegrationTest {
       return (M m) -> m.getValue();
     }
   }
+
+  /**
+   * Job should fail since it is missing config "job.default.system" for partitionBy Operator
+   */
+  @Test(expected = SamzaException.class)
+  public void testSamzaJobStartMissingConfigFailureForStreamApplication() {
+
+    CollectionStream<TestData.PageView> input = CollectionStream.of("test", "PageView", new ArrayList<>());
+    CollectionStream output = CollectionStream.empty("test", "Output", 10);
+
+    TestRunner
+        .of(pageViewParition)
+        .addInputStream(input)
+        .addOutputStream(output)
+        .run(Duration.ofMillis(1000));
+  }
+
+  /**
+   * Null page key is passed in input data which should fail filter logic
+   */
+  @Test(expected = SamzaException.class)
+  public void testSamzaJobFailureForStreamApplication() {
+    Random random = new Random();
+    int count = 10;
+    List<TestData.PageView> pageviews = new ArrayList<>(count);
+    for (int i = 0; i < count; i++) {
+      String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];
+      int memberId = i;
+      pageviews.add(new TestData.PageView(null, memberId));
+    }
+
+    CollectionStream<TestData.PageView> input = CollectionStream.of("test", "PageView", pageviews);
+    CollectionStream output = CollectionStream.empty("test", "Output", 1);
+
+    TestRunner.of(pageViewFilter)
+        .addInputStream(input)
+        .addOutputStream(output)
+        .addOverrideConfig("job.default.system", "test")
+        .run(Duration.ofMillis(1000));
+  }
 }
index e052539..2cc5977 100644 (file)
 
 package org.apache.samza.test.framework;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.samza.SamzaException;
 import org.apache.samza.test.framework.stream.CollectionStream;
 import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.junit.Assert;
@@ -36,10 +38,27 @@ public class StreamTaskIntegrationTest {
     CollectionStream<Integer> input = CollectionStream.of("test", "input", inputList);
     CollectionStream output = CollectionStream.empty("test", "output");
 
-    TestRunner.of(MyStreamTestTask.class).addInputStream(input).addOutputStream(output).run();
+    TestRunner.of(MyStreamTestTask.class).addInputStream(input).addOutputStream(output).run(Duration.ofSeconds(1));
 
     Assert.assertThat(TestRunner.consumeStream(output, 1000).get(0),
         IsIterableContainingInOrder.contains(outputList.toArray()));
   }
 
+  /**
+   * Samza job logic expects integers, but doubles are passed here which results in failure
+   */
+  @Test(expected = SamzaException.class)
+  public void testSamzaJobFailureForSyncTask() {
+    List<Double> inputList = Arrays.asList(1.2, 2.3, 3.33, 4.5);
+
+    CollectionStream<Double> input = CollectionStream.of("test", "doubles", inputList);
+    CollectionStream output = CollectionStream.empty("test", "output");
+
+    TestRunner
+        .of(MyStreamTestTask.class)
+        .addInputStream(input)
+        .addOutputStream(output)
+        .run(Duration.ofSeconds(1));
+  }
+
 }