SAMZA-588; expose SamzaContainerContext through TaskContext
authorBenjamin Fradet <benjamin.fradet@gmail.com>
Tue, 24 Mar 2015 19:47:00 +0000 (12:47 -0700)
committerChris Riccomini <criccomini@apache.org>
Tue, 24 Mar 2015 19:47:00 +0000 (12:47 -0700)
samza-api/src/main/java/org/apache/samza/task/TaskContext.java
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala

index 929409e..5b337a6 100644 (file)
@@ -19,6 +19,7 @@
 
 package org.apache.samza.task;
 
+import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.SystemStreamPartition;
@@ -38,6 +39,8 @@ public interface TaskContext {
 
   TaskName getTaskName();
 
+  SamzaContainerContext getSamzaContainerContext();
+
   /**
    * Set the starting offset for the given {@link org.apache.samza.system.SystemStreamPartition}. Offsets
    * can only be set for a {@link org.apache.samza.system.SystemStreamPartition} assigned to this task
index 9fc3b55..5416dd6 100644 (file)
@@ -414,7 +414,7 @@ object SamzaContainer extends Logging {
     // Increment by 1 because partition starts from 0, but we need the absolute count,
     // this value is used for change log topic creation.
     val maxChangeLogStreamPartitions = containerModel.getTasks.values
-            .max(Ordering.by{task:TaskModel => task.getChangelogPartition.getPartitionId})
+            .max(Ordering.by { task:TaskModel => task.getChangelogPartition.getPartitionId })
             .getChangelogPartition.getPartitionId + 1
 
     val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => {
@@ -494,6 +494,7 @@ object SamzaContainer extends Logging {
         metrics = taskInstanceMetrics,
         consumerMultiplexer = consumerMultiplexer,
         collector = collector,
+        containerContext = containerContext,
         offsetManager = offsetManager,
         storageManager = storageManager,
         reporters = reporters,
index be0b55a..c5a5ea5 100644 (file)
@@ -46,6 +46,7 @@ class TaskInstance(
   metrics: TaskInstanceMetrics,
   consumerMultiplexer: SystemConsumers,
   collector: TaskInstanceCollector,
+  containerContext: SamzaContainerContext,
   offsetManager: OffsetManager = new OffsetManager,
   storageManager: TaskStorageManager = null,
   reporters: Map[String, MetricsReporter] = Map(),
@@ -65,6 +66,7 @@ class TaskInstance(
       null
     }
     def getTaskName = taskName
+    def getSamzaContainerContext = containerContext
 
     override def setStartingOffset(ssp: SystemStreamPartition, offset: String): Unit = {
       offsetManager.startingOffsets += (ssp -> offset)
index 81742bc..cab31ca 100644 (file)
@@ -121,13 +121,16 @@ class TestSamzaContainer extends AssertionsForJUnit {
       Map[String, SystemProducer](),
       new SerdeManager)
     val collector = new TaskInstanceCollector(producerMultiplexer)
+    val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       taskName,
       config,
       new TaskInstanceMetrics,
       consumerMultiplexer,
-      collector)
+      collector,
+      containerContext
+    )
     val runLoop = new RunLoop(
       taskInstances = Map(taskName -> taskInstance),
       consumerMultiplexer = consumerMultiplexer,
index 54b4df8..7caad28 100644 (file)
@@ -69,6 +69,7 @@ class TestTaskInstance {
     val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
     val taskName = new TaskName("taskName")
     val collector = new TaskInstanceCollector(producerMultiplexer)
+    val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       taskName,
@@ -76,6 +77,7 @@ class TestTaskInstance {
       new TaskInstanceMetrics,
       consumerMultiplexer,
       collector,
+      containerContext,
       offsetManager)
     // Pretend we got a message with offset 2 and next offset 3.
     val coordinator = new ReadableCoordinator(taskName)
@@ -159,6 +161,7 @@ class TestTaskInstance {
     val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
     val taskName = new TaskName("taskName")
     val collector = new TaskInstanceCollector(producerMultiplexer)
+    val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
 
     val registry = new MetricsRegistryMap
     val taskMetrics = new TaskInstanceMetrics(registry = registry)
@@ -169,6 +172,7 @@ class TestTaskInstance {
       taskMetrics,
       consumerMultiplexer,
       collector,
+      containerContext,
       offsetManager,
       exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config))
 
@@ -211,6 +215,7 @@ class TestTaskInstance {
     val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
     val taskName = new TaskName("taskName")
     val collector = new TaskInstanceCollector(producerMultiplexer)
+    val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
 
     val registry = new MetricsRegistryMap
     val taskMetrics = new TaskInstanceMetrics(registry = registry)
@@ -221,6 +226,7 @@ class TestTaskInstance {
       taskMetrics,
       consumerMultiplexer,
       collector,
+      containerContext,
       offsetManager,
       exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config))
 
@@ -261,23 +267,28 @@ class TestTaskInstance {
       override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator): Unit = {}
     }
 
+    val config = new MapConfig()
     val chooser = new RoundRobinChooser()
     val consumers = new SystemConsumers(chooser, consumers = Map.empty)
     val producers = new SystemProducers(Map.empty, new SerdeManager())
+    val metrics = new TaskInstanceMetrics()
+    val taskName = new TaskName("Offset Reset Task 0")
     val collector = new TaskInstanceCollector(producers)
+    val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
 
     val offsetManager = new OffsetManager()
 
     offsetManager.startingOffsets ++= Map(partition0 -> "0", partition1 -> "0")
 
     val taskInstance = new TaskInstance(
-      task = task,
-      taskName = new TaskName("Offset Reset Task 0"),
-      config = new MapConfig(),
-      metrics = new TaskInstanceMetrics(),
-      consumerMultiplexer = consumers,
-      collector = collector,
-      offsetManager = offsetManager,
+      task,
+      taskName,
+      config,
+      metrics,
+      consumers,
+      collector,
+      containerContext,
+      offsetManager,
       systemStreamPartitions = Set(partition0, partition1) )
 
     taskInstance.initTask