SAMZA-568; allow tasks to override offsets in init method
authorBen Kirwin <ben@kirw.in>
Mon, 16 Mar 2015 23:05:28 +0000 (16:05 -0700)
committerChris Riccomini <criccomini@apache.org>
Mon, 16 Mar 2015 23:05:28 +0000 (16:05 -0700)
samza-api/src/main/java/org/apache/samza/task/TaskContext.java
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala

index 6d10212..929409e 100644 (file)
@@ -37,4 +37,14 @@ public interface TaskContext {
   Object getStore(String name);
 
   TaskName getTaskName();
+
+  /**
+   * Set the starting offset for the given {@link org.apache.samza.system.SystemStreamPartition}. Offsets
+   * can only be set for a {@link org.apache.samza.system.SystemStreamPartition} assigned to this task
+   * (as returned by {@link #getSystemStreamPartitions()}); trying to set the offset for any other partition
+   * will have no effect.
+   *
+   * NOTE: this feature is experimental, and the API may change in a future release.
+   */
+  void setStartingOffset(SystemStreamPartition ssp, String offset);
 }
index a583ff9..be0b55a 100644 (file)
@@ -65,6 +65,10 @@ class TaskInstance(
       null
     }
     def getTaskName = taskName
+
+    override def setStartingOffset(ssp: SystemStreamPartition, offset: String): Unit = {
+      offsetManager.startingOffsets += (ssp -> offset)
+    }
   }
 
   def registerMetrics {
index 11eab16..54b4df8 100644 (file)
@@ -40,12 +40,7 @@ import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.chooser.RoundRobinChooser
-import org.apache.samza.task.MessageCollector
-import org.apache.samza.task.ReadableCoordinator
-import org.apache.samza.task.StreamTask
-import org.apache.samza.task.TaskCoordinator
-import org.apache.samza.task.TaskInstanceCollector
-import org.apache.samza.task.WindowableTask
+import org.apache.samza.task._
 import org.junit.Assert._
 import org.junit.Test
 import org.scalatest.Assertions.intercept
@@ -240,4 +235,54 @@ class TestTaskInstance {
     assertEquals(2L, getCount(group, classOf[NonFatalException].getName))
     assertEquals(1L, getCount(group, classOf[FatalException].getName))
   }
+
+
+  /**
+   * Tests that the init() method of task can override the existing offset
+   * assignment.
+   */
+  @Test
+  def testManualOffsetReset {
+
+    val partition0 = new SystemStreamPartition("system", "stream", new Partition(0))
+    val partition1 = new SystemStreamPartition("system", "stream", new Partition(1))
+
+    val task = new StreamTask with InitableTask {
+
+      override def init(config: Config, context: TaskContext): Unit = {
+
+        assertTrue("Can only update offsets for assigned partition",
+          context.getSystemStreamPartitions.contains(partition1)
+        )
+
+        context.setStartingOffset(partition1, "10")
+      }
+
+      override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator): Unit = {}
+    }
+
+    val chooser = new RoundRobinChooser()
+    val consumers = new SystemConsumers(chooser, consumers = Map.empty)
+    val producers = new SystemProducers(Map.empty, new SerdeManager())
+    val collector = new TaskInstanceCollector(producers)
+
+    val offsetManager = new OffsetManager()
+
+    offsetManager.startingOffsets ++= Map(partition0 -> "0", partition1 -> "0")
+
+    val taskInstance = new TaskInstance(
+      task = task,
+      taskName = new TaskName("Offset Reset Task 0"),
+      config = new MapConfig(),
+      metrics = new TaskInstanceMetrics(),
+      consumerMultiplexer = consumers,
+      collector = collector,
+      offsetManager = offsetManager,
+      systemStreamPartitions = Set(partition0, partition1) )
+
+    taskInstance.initTask
+
+    assertEquals(Some("0"), offsetManager.getStartingOffset(partition0))
+    assertEquals(Some("10"), offsetManager.getStartingOffset(partition1))
+  }
 }