SAMZA-1728: BootstrappingChooser - Call checkOffset only for a lagging partition...
authorAditya Toomula <atoomula@linkedin.com>
Wed, 30 May 2018 01:48:02 +0000 (18:48 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Wed, 30 May 2018 01:48:02 +0000 (18:48 -0700)
Author: Aditya Toomula <atoomula@linkedin.com>

Reviewers: Jagadish<jagadish@apache.org>

Closes #533 from atoomula/chooser

samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala

index 38e2cfa..f50f27d 100644 (file)
@@ -196,12 +196,12 @@ class BootstrappingChooser(
           val systemStream = systemStreamPartition.getSystemStream
 
           updatedSystemStreams += systemStream -> (updatedSystemStreams.getOrElse(systemStream, 0) - 1)
-        }
 
-        // If the offset we just read is the same as the offset for the last
-        // message (newest) in this system stream partition, then we have read
-        // all messages, and can mark this SSP as bootstrapped.
-        checkOffset(systemStreamPartition, offset, OffsetType.NEWEST)
+          // If the offset we just read is the same as the offset for the last
+          // message (newest) in this system stream partition, then we have read
+          // all messages, and can mark this SSP as bootstrapped.
+          checkOffset(systemStreamPartition, offset, OffsetType.NEWEST)
+        }
       }
 
       envelope
index a017518..1a99355 100644 (file)
@@ -37,10 +37,14 @@ import scala.collection.JavaConverters._
 
 @RunWith(value = classOf[Parameterized])
 class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser) {
-  val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "120", null, 1);
-  val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "121", null, 2);
-  val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "122", null, 3);
-  val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4);
+  val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "120", null, 1)
+  val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "121", null, 2)
+  val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "122", null, 3)
+  val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4)
+  val envelope5 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "124", null, 5)
+  val envelope6 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "125", null, 6)
+  val envelope7 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "124", null, 7)
+  val envelope8 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "125", null, 8)
 
   /**
    * Helper function to create metadata for a single envelope with a single offset.
@@ -202,6 +206,56 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
   }
 
   @Test
+  def testChooserShouldHaveNoLaggingSspsAfterCaughtUp {
+    val mockMessageChooser = new MockMessageChooser
+    val sspMetadataMap =
+      Map(envelope3.getSystemStreamPartition.getPartition -> new SystemStreamPartitionMetadata(null, "123", null),
+      envelope2.getSystemStreamPartition.getPartition -> new SystemStreamPartitionMetadata(null, "123", null))
+    val metadata = new SystemStreamMetadata(
+      envelope3.getSystemStreamPartition.getStream,
+      sspMetadataMap.asJava)
+    val systemAdmins = mock(classOf[SystemAdmins])
+    when(systemAdmins.getSystemAdmin("kafka")).thenReturn(new MockSystemAdmin)
+    val chooser = new BootstrappingChooser(mockMessageChooser, Map(envelope2.getSystemStreamPartition.getSystemStream -> metadata),
+      new BootstrappingChooserMetrics(), systemAdmins)
+
+    chooser.register(envelope2.getSystemStreamPartition, "1")
+    chooser.register(envelope3.getSystemStreamPartition, "1")
+    chooser.start
+
+    // There should be 2 lagging partitions
+    assertEquals(Map(envelope2.getSystemStreamPartition.getSystemStream -> 2), chooser.systemStreamLagCounts)
+
+    assertNull(chooser.choose)
+    chooser.update(envelope5) // ssp1 is now marked as not lagging
+    assertEquals(envelope5, chooser.choose)
+
+    // There should be 1 lagging partition
+    assertEquals(Map(envelope2.getSystemStreamPartition.getSystemStream -> 1), chooser.systemStreamLagCounts)
+
+    // Update with one more envelope from ssp1 and make sure that systemStreamLagCounts is still 1
+    chooser.update(envelope6)
+    assertEquals(null, chooser.choose) // no events are expected to be chosen from ssp1 until lagging ssp0 has envelopes
+
+    chooser.update(envelope3)
+    assertEquals(envelope6, chooser.choose)
+    assertEquals(envelope3, chooser.choose)
+
+    // There should still be 1 lagging partition
+    assertEquals(Map(envelope2.getSystemStreamPartition.getSystemStream -> 1), chooser.systemStreamLagCounts)
+
+    chooser.update(envelope7)
+    assertEquals(envelope7, chooser.choose)  // ssp0 is now marked as not lagging
+
+    // chooser should not have any lagging partitions
+    assertTrue(chooser.laggingSystemStreamPartitions.isEmpty)
+    assertTrue(chooser.systemStreamLagCounts.isEmpty)
+
+    chooser.update(envelope8)
+    assertEquals(envelope8, chooser.choose)
+  }
+
+  @Test
   def testChooserRegisterWithStreamUsedAsBootstrapAndBroadcast: Unit = {
     val mockMessageChooser = new MockMessageChooser
     val metadata1 = getMetadata(envelope1, "123")