Address review comments. 908/head
authorShanthoosh Venkataraman <spvenkat@usc.edu>
Thu, 7 Feb 2019 00:18:01 +0000 (16:18 -0800)
committerShanthoosh Venkataraman <spvenkat@usc.edu>
Thu, 7 Feb 2019 02:03:11 +0000 (18:03 -0800)
samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala

index 2302f49..0732311 100644 (file)
@@ -31,6 +31,7 @@ import org.apache.samza.system.chooser.MessageChooser
 import org.apache.samza.system.chooser.DefaultChooser
 import org.apache.samza.system.chooser.MockMessageChooser
 import org.apache.samza.util.BlockingEnvelopeMap
+import org.mockito.Mockito
 
 import scala.collection.JavaConverters._
 
@@ -340,39 +341,22 @@ class TestSystemConsumers {
     assertTrue(consumer.lastPoll.contains(systemStreamPartition1))
   }
 
-  @Test(expected = classOf[UnsupportedOperationException])
-  def testSystemConsumersAndStartpointVisitor {
+  @Test
+  def testSystemConsumersRegistration {
     val system = "test-system"
     val stream = "some-stream"
     val systemStreamPartition1 = new SystemStreamPartition(system, stream, new Partition(1))
     val systemStreamPartition2 = new SystemStreamPartition(system, stream, new Partition(2))
 
-    val consumer = new TestStartpointConsumer
+    val consumer = Mockito.mock(classOf[SystemConsumer])
+    val startpoint = Mockito.mock(classOf[Startpoint])
     val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer),
       new SerdeManager, new SystemConsumersMetrics,
       SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
       SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
       SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
 
-    consumers.register(systemStreamPartition1, "0", null)
-    assertNull(consumer.getVisitedStartpoint(systemStreamPartition1))
-
-    val startpointSpecific = new StartpointSpecific("1")
-    consumers.register(systemStreamPartition2, "0", startpointSpecific)
-    assertEquals(startpointSpecific, consumer.getVisitedStartpoint(systemStreamPartition2))
-
-    val startpointTimestamp = new StartpointTimestamp(123456L)
-    consumers.register(systemStreamPartition2, "0", startpointTimestamp)
-    assertEquals(startpointTimestamp, consumer.getVisitedStartpoint(systemStreamPartition2))
-
-    val startpointOldest = new StartpointOldest
-    consumers.register(systemStreamPartition2, "0", startpointOldest)
-    assertEquals(startpointOldest, consumer.getVisitedStartpoint(systemStreamPartition2))
-
-    val startpointUpcoming = new StartpointUpcoming
-    consumers.register(systemStreamPartition2, "0", startpointUpcoming)
-    assertEquals(startpointUpcoming, consumer.getVisitedStartpoint(systemStreamPartition2))
-
+    consumers.register(systemStreamPartition1, "0", startpoint)
   }
 
   /**
@@ -420,30 +404,6 @@ class TestSystemConsumers {
     def stop {}
     def register { super.register(systemStreamPartition, "0") }
   }
-
-  private class TestStartpointConsumer extends SerializingConsumer with StartpointVisitor {
-    var startpoints: Map[SystemStreamPartition, Startpoint] = Map()
-
-    override def visit(systemStreamPartition: SystemStreamPartition, startpointSpecific: StartpointSpecific) {
-      startpoints += systemStreamPartition -> startpointSpecific
-    }
-
-    override def visit(systemStreamPartition: SystemStreamPartition, startpointTimestamp: StartpointTimestamp) {
-      startpoints += systemStreamPartition -> startpointTimestamp
-    }
-
-    override def visit(systemStreamPartition: SystemStreamPartition, startpointUpcoming: StartpointUpcoming) {
-      startpoints += systemStreamPartition -> startpointUpcoming
-    }
-
-    override def visit(systemStreamPartition: SystemStreamPartition, startpointOldest: StartpointOldest) {
-      startpoints += systemStreamPartition -> startpointOldest
-    }
-
-    def getVisitedStartpoint(systemStreamPartition: SystemStreamPartition) : Startpoint = {
-      startpoints.getOrElse(systemStreamPartition, null)
-    }
-  }
 }
 
 object TestSystemConsumers {