SAMZA-2083: Adding the register(SSP, startpoint) API to the SystemConsumer.
authorShanthoosh Venkataraman <spvenkat@usc.edu>
Mon, 4 Feb 2019 19:01:40 +0000 (11:01 -0800)
committerShanthoosh Venkataraman <spvenkat@usc.edu>
Wed, 6 Feb 2019 21:11:29 +0000 (13:11 -0800)
samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala

index 5b4259b..c5b3031 100644 (file)
@@ -22,6 +22,8 @@ package org.apache.samza.system;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.startpoint.Startpoint;
 
 /**
  * <p>
@@ -29,7 +31,7 @@ import java.util.Set;
  * wishes to integrate with Samza. Examples of systems that one might want to
  * integrate would be systems like Kafka, Hadoop, Kestrel, SQS, etc.
  * </p>
- * 
+ *
  * <p>
  * SamzaContainer uses SystemConsumer to read messages from the underlying
  * system, and funnels the messages to the appropriate StreamTask instances. The
@@ -38,11 +40,11 @@ import java.util.Set;
  * repeat. If no IncomingMessageEnvelopes are returned, the SamzaContainer polls
  * again, but with a timeout of 10ms.
  * </p>
- * 
+ *
  * <p>
  * The SamzaContainer treats SystemConsumer in the following way:
  * </p>
- * 
+ *
  * <ul>
  * <li>Start will be called before stop.</li>
  * <li>Register will be called one or more times before start.</li>
@@ -63,17 +65,17 @@ import java.util.Set;
  * <li>Any exception thrown by the SystemConsumer means that the SamzaContainer
  * should halt.</li>
  * </ul>
- * 
+ *
  * <p>
  * There are generally three implementation styles to this interface:
  * </p>
- * 
+ *
  * <ol>
  * <li>Thread-based</li>
  * <li>Selector-based</li>
  * <li>Synchronous</li>
  * </ol>
- * 
+ *
  * <p>
  * Thread-based implementations typically use a series of threads to read from
  * an underlying system asynchronously, and put the resulting messages into a
@@ -82,12 +84,12 @@ import java.util.Set;
  * BlockingEnvelopeMap is a helper class that makes it easy to implement
  * thread-based implementations of SystemConsumer.
  * </p>
- * 
+ *
  * <p>
  * Selector-based implementations typically setup NIO-based non-blocking socket
  * that can be selected for new data when poll is called.
  * </p>
- * 
+ *
  * <p>
  * Synchronous implementations simply fetch directly from the underlying system
  * whenever poll is invoked. Synchronous implementations must take great care to
@@ -102,7 +104,7 @@ public interface SystemConsumer {
    * SystemStreamPartitions supplied are at head (have no new messages available
    * since the last poll invocation was made for each SystemStreamPartition).
    */
-  public static int BLOCK_ON_OUTSTANDING_MESSAGES = -1;
+  int BLOCK_ON_OUTSTANDING_MESSAGES = -1;
 
   /**
    * Tells the SystemConsumer to connect to the underlying system, and prepare
@@ -122,7 +124,7 @@ public interface SystemConsumer {
    * should try and read messages from all SystemStreamPartitions that are
    * registered to it. SystemStreamPartitions should only be registered before
    * start is called.
-   * 
+   *
    * @param systemStreamPartition
    *          The SystemStreamPartition object representing the Samza
    *          SystemStreamPartition to receive messages from.
@@ -137,9 +139,22 @@ public interface SystemConsumer {
   void register(SystemStreamPartition systemStreamPartition, String offset);
 
   /**
+   * Registers the {@link Startpoint} to the SystemConsumer. SystemConsumer
+   * should read the messages from all the registered SystemStreamPartitions.
+   * SystemStreamPartitions should be registered before the start is called.
+   *
+   * @param systemStreamPartition represents the SystemStreamPartition to be registered.
+   * @param startpoint represents the position in the SystemStreamPartition.
+   */
+  @InterfaceStability.Evolving
+  default void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
+    throw new UnsupportedOperationException("This operation is not supported.");
+  }
+
+  /**
    * Poll the SystemConsumer to get any available messages from the underlying
    * system.
-   * 
+   *
    * <p>
    * If the underlying implementation does not take care to adhere to the
    * timeout parameter, the SamzaContainer's performance will suffer
@@ -147,7 +162,7 @@ public interface SystemConsumer {
    * will block the entire main thread in SamzaContainer, and no messages will
    * be processed while blocking is occurring.
    * </p>
-   * 
+   *
    * @param systemStreamPartitions
    *          A set of SystemStreamPartition to poll for new messages. If
    *          SystemConsumer has messages available for other registered
index 5eae194..a84c5b8 100644 (file)
@@ -258,7 +258,7 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
 
   def getMetadataStoreFactory = getOption(JobConfig.METADATA_STORE_FACTORY).getOrElse(classOf[CoordinatorStreamMetadataStoreFactory].getCanonicalName)
 
-  def getStartpointMetadataStoreFactory = getOption(JobConfig.STARTPOINT_METADATA_STORE_FACTORY).getOrElse(getMetadataStoreFactory)
+  def getStartpointMetadataStoreFactory = getOption(JobConfig.STARTPOINT_METADATA_STORE_FACTORY).getOrElse(null)
 
   def getDiagnosticsEnabled = { getBoolean(JobConfig.JOB_DIAGNOSTICS_ENABLED, false) }
 
index 2c9a3ea..68b2f09 100644 (file)
@@ -21,19 +21,19 @@ package org.apache.samza.system
 
 
 import java.util
+import java.util.ArrayDeque
 import java.util.concurrent.TimeUnit
+import java.util.Collections
+import java.util.HashMap
+import java.util.HashSet
+import java.util.Queue
+import java.util.Set
 import scala.collection.JavaConverters._
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.util.{Logging, TimerUtil}
-import org.apache.samza.startpoint.{Startpoint, StartpointVisitor}
+import org.apache.samza.startpoint.Startpoint
 import org.apache.samza.system.chooser.MessageChooser
 import org.apache.samza.SamzaException
-import java.util.ArrayDeque
-import java.util.Collections
-import java.util.HashSet
-import java.util.HashMap
-import java.util.Queue
-import java.util.Set
 
 
 object SystemConsumers {
@@ -205,8 +205,8 @@ class SystemConsumers (
 
     try {
       val consumer = consumers(systemStreamPartition.getSystem)
-      if (startpoint != null && consumer.isInstanceOf[StartpointVisitor]) {
-        startpoint.apply(systemStreamPartition, consumer.asInstanceOf[StartpointVisitor])
+      if (startpoint != null) {
+        consumer.register(systemStreamPartition, startpoint)
       } else {
         consumer.register(systemStreamPartition, offset)
       }
index 3da564e..a7a9c8b 100644 (file)
@@ -616,7 +616,7 @@ class TestTaskStorageManager extends MockitoSugar {
     })
 
     val mockSystemConsumer = mock[SystemConsumer]
-    when(mockSystemConsumer.register(any(), any())).thenAnswer(new Answer[Unit] {
+    when(mockSystemConsumer.register(any(classOf[SystemStreamPartition]), any(classOf[String]))).thenAnswer(new Answer[Unit] {
       override def answer(invocation: InvocationOnMock): Unit = {
         val args = invocation.getArguments
         if (ssp.equals(args.apply(0).asInstanceOf[SystemStreamPartition])) {
index 2941236..d4f4b1e 100644 (file)
@@ -340,7 +340,7 @@ class TestSystemConsumers {
     assertTrue(consumer.lastPoll.contains(systemStreamPartition1))
   }
 
-  @Test
+  @Test(expected = classOf[Exception])
   def testSystemConsumersAndStartpointVisitor {
     val system = "test-system"
     val stream = "some-stream"