SAMZA-1388: Flaky test - TestStatefulTask#testShouldStartAndRestore
authorJacob Maes <jmaes@linkedin.com>
Fri, 11 Aug 2017 22:50:12 +0000 (15:50 -0700)
committerJacob Maes <jmaes@linkedin.com>
Fri, 11 Aug 2017 22:50:12 +0000 (15:50 -0700)
I believe the problem originated from SAMZA-173.

The core issue is testShouldRestoreStore was not updated to expect 6 messages after 2 more messages were added to testShouldStartTaskForFirstTime.

Fixed the issue and refactored the code so the 2 methods wouldn't disagree again in the future.

Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Prateek Maheshwari <pmaheshw@linkedin.com>

Closes #269 from jmakes/samza-1388

samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala

index e5b6756..734487b 100644 (file)
@@ -30,8 +30,16 @@ import org.junit.{AfterClass, BeforeClass, Test}
 import scala.collection.JavaConverters._
 
 object TestStatefulTask {
-    val STORE_NAME = "mystore"
-    val STATE_TOPIC_STREAM = "mystoreChangelog"
+  val STORE_NAME = "mystore"
+  val STATE_TOPIC_STREAM = "mystoreChangelog"
+
+  // Messages with one dupe and one delete. A negative string means delete. See StateStoreTestTask.testProcess()
+  val MESSAGES_SEND_1 = List("1", "2", "3", "2", "99", "-99")
+  val MESSAGES_RECV_1 = List("1", "2", "3", "2", "99", null)
+  val STORE_CONTENTS_1 = List("1", "2", "3")
+
+  val MESSAGES_SEND_2 = List("4", "5", "5")
+  val MESSAGES_RECV_2 = List("4", "5", "5")
 
   @BeforeClass
   def beforeSetupServers {
@@ -47,13 +55,13 @@ object TestStatefulTask {
 /**
  * Test that does the following:
  * 1. Start a single partition of TestStateStoreTask using ThreadJobFactory.
- * 2. Send four messages to input (1,2,3,2), which contain one dupe (2).
+ * 2. Send MESSAGES_SEND_1, which contains a dupe and a delete.
  * 3. Validate that all messages were received by TestStateStoreTask.
- * 4. Validate that TestStateStoreTask called store.put() for all four messages, and that the messages ended up in the mystore topic.
+ * 4. Validate that TestStateStoreTask called store.put() for all messages, and that the messages ended up in the mystore topic.
  * 5. Kill the job.
  * 6. Start the job again.
- * 7. Validate that the job restored all messages (1,2,3) to the store.
- * 8. Send three more messages to input (4,5,5), and validate that TestStateStoreTask receives them.
+ * 7. Validate that the job restored all messages STORE_CONTENTS_1 to the store.
+ * 8. Send three more messages to input MESSAGES_SEND_2, and validate that TestStateStoreTask receives them.
  * 9. Kill the job again.
  */
 class TestStatefulTask extends StreamTaskTestUtil {
@@ -86,23 +94,12 @@ class TestStatefulTask extends StreamTaskTestUtil {
     assertEquals(0, task.received.size)
 
     // Send some messages to input stream.
-    send(task, "1")
-    send(task, "2")
-    send(task, "3")
-    send(task, "2")
-    send(task, "99")
-    send(task, "-99")
+    TestStatefulTask.MESSAGES_SEND_1.foreach(m => send(task, m))
 
     // Validate that messages appear in store stream.
     val messages = readAll(TestStatefulTask.STATE_TOPIC_STREAM, 5, "testShouldStartTaskForFirstTime")
 
-    assertEquals(6, messages.length)
-    assertEquals("1", messages(0))
-    assertEquals("2", messages(1))
-    assertEquals("3", messages(2))
-    assertEquals("2", messages(3))
-    assertEquals("99", messages(4))
-    assertNull(messages(5))
+    assertEquals(TestStatefulTask.MESSAGES_RECV_1, messages)
 
     stopJob(job)
   }
@@ -111,52 +108,34 @@ class TestStatefulTask extends StreamTaskTestUtil {
     val (job, task) = startJob
 
     // Validate that restored has expected data.
-    assertEquals(3, task.asInstanceOf[StateStoreTestTask].restored.size)
-    assertTrue(task.asInstanceOf[StateStoreTestTask].restored.contains("1"))
-    assertTrue(task.asInstanceOf[StateStoreTestTask].restored.contains("2"))
-    assertTrue(task.asInstanceOf[StateStoreTestTask].restored.contains("3"))
+    assertEquals(TestStatefulTask.STORE_CONTENTS_1.length, task.asInstanceOf[StateStoreTestTask].restored.size)
+    TestStatefulTask.STORE_CONTENTS_1.foreach(m =>  assertTrue(task.asInstanceOf[StateStoreTestTask].restored.contains(m)))
 
     var count = 0
 
-    // We should get the original four messages in the stream (1,2,3,2).
+    // We should get the original size messages in the stream (1,2,3,2,99,-99).
     // Note that this will trigger four new outgoing messages to the STATE_TOPIC.
-    while (task.received.size < 4 && count < 100) {
+    while (task.received.size < TestStatefulTask.MESSAGES_RECV_1.length && count < 100) {
       Thread.sleep(600)
       count += 1
     }
 
     assertTrue("Timed out waiting to received messages. Received thus far: " + task.received.size, count < 100)
 
-    // Reset the count down latch after the 4 messages come in.
+    // Reset the count down latch after the 6 messages come in.
     task.awaitMessage
 
     // Send some messages to input stream.
-    send(task, "4")
-    send(task, "5")
-    send(task, "5")
+    TestStatefulTask.MESSAGES_SEND_2.foreach(m => send(task, m))
+
+    val expectedMessagesRcvd =  TestStatefulTask.MESSAGES_RECV_1 ++ // From initial start.
+                                TestStatefulTask.MESSAGES_RECV_1 ++ // From second startup.
+                                TestStatefulTask.MESSAGES_RECV_2    // From sending in this method.
 
     // Validate that messages appear in store stream.
-    val messages = readAll(TestStatefulTask.STATE_TOPIC_STREAM, 14, "testShouldRestoreStore")
-
-    assertEquals(15, messages.length)
-    // From initial start.
-    assertEquals("1", messages(0))
-    assertEquals("2", messages(1))
-    assertEquals("3", messages(2))
-    assertEquals("2", messages(3))
-    assertEquals("99", messages(4))
-    assertNull(messages(5))
-    // From second startup.
-    assertEquals("1", messages(6))
-    assertEquals("2", messages(7))
-    assertEquals("3", messages(8))
-    assertEquals("2", messages(9))
-    assertEquals("99", messages(10))
-    assertNull(messages(11))
-    // From sending in this method.
-    assertEquals("4", messages(12))
-    assertEquals("5", messages(13))
-    assertEquals("5", messages(14))
+    val messages = readAll(TestStatefulTask.STATE_TOPIC_STREAM, expectedMessagesRcvd.length-1, "testShouldRestoreStore")
+
+    assertEquals(expectedMessagesRcvd, messages)
 
     stopJob(job)
   }