SAMZA-437; remove task lifecycle listener
authorChris Riccomini <criccomi@criccomi-mn.linkedin.biz>
Mon, 3 Nov 2014 22:13:58 +0000 (14:13 -0800)
committerChris Riccomini <criccomi@criccomi-mn.linkedin.biz>
Mon, 3 Nov 2014 22:13:58 +0000 (14:13 -0800)
docs/learn/documentation/versioned/container/event-loop.md
docs/learn/documentation/versioned/jobs/configuration-table.html
samza-api/src/main/java/org/apache/samza/task/TaskContext.java
samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListener.java [deleted file]
samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListenerFactory.java [deleted file]
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala

index f0f21b0..1162383 100644 (file)
@@ -39,22 +39,10 @@ The event loop works as follows:
 
 The container does this, in a loop, until it is shut down. Note that although there can be multiple task instances within a container (depending on the number of input stream partitions), their process() and window() methods are all called on the same thread, never concurrently on different threads.
 
-### Lifecycle Listeners
+### Lifecycle
 
-Sometimes, you need to run your own code at specific points in a task's lifecycle. For example, you might want to set up some context in the container whenever a new message arrives, or perform some operations on startup or shutdown.
+The only way in which a developer can hook into a SamzaContainer's lifecycle is through the standard InitableTask, ClosableTask, StreamTask, and WindowableTask. In cases where pluggable logic needs to be added to wrap a StreamTask, the StreamTask can be wrapped by another StreamTask implementation that handles the custom logic before calling into the wrapped StreamTask.
 
-To receive notifications when such events happen, you can implement the [TaskLifecycleListenerFactory](../api/javadocs/org/apache/samza/task/TaskLifecycleListenerFactory.html) interface. It returns a [TaskLifecycleListener](../api/javadocs/org/apache/samza/task/TaskLifecycleListener.html), whose methods are called by Samza at the appropriate times.
-
-You can then tell Samza to use your lifecycle listener with the following properties in your job configuration:
-
-{% highlight jproperties %}
-# Define a listener called "my-listener" by giving the factory class name
-task.lifecycle.listener.my-listener.class=com.example.foo.MyListenerFactory
-
-# Enable it in this job (multiple listeners can be separated by commas)
-task.lifecycle.listeners=my-listener
-{% endhighlight %}
-
-The Samza container creates one instance of your [TaskLifecycleListener](../api/javadocs/org/apache/samza/task/TaskLifecycleListener.html). If the container has multiple task instances (processing different input stream partitions), the beforeInit, afterInit, beforeClose and afterClose methods are called for each task instance. The [TaskContext](../api/javadocs/org/apache/samza/task/TaskContext.html) argument of those methods gives you more information about the partitions.
+A concrete example is a set of StreamTasks that all want to share the same try/catch logic in their process() method. A StreamTask can be implemented that wraps the original StreamTasks, and surrounds the original process() call with the appropriate try/catch logic. For more details, see [this discussion](https://issues.apache.org/jira/browse/SAMZA-437).
 
 ## [JMX &raquo;](jmx.html)
index e3251a6..e7a4d3c 100644 (file)
                 </tr>
 
                 <tr>
-                    <td class="property" id="task-lifecycle-listener-class">task.lifecycle.listener.<br><span class="listener">listener-name</span>.class</td>
-                    <td class="default"></td>
-                    <td class="description">
-                        Use this property to register a
-                        <a href="../container/event-loop.html#lifecycle-listeners">lifecycle listener</a>, which can receive
-                        a notification when a container starts up or shuts down, or when a message is processed.
-                        The value is the fully-qualified name of a Java class that implements
-                        <a href="../api/javadocs/org/apache/samza/task/TaskLifecycleListenerFactory.html">TaskLifecycleListenerFactory</a>.
-                        You can define multiple lifecycle listeners, each with a different <span class="listener">listener-name</span>,
-                        and reference them in <a href="#task-lifecycle-listeners" class="property">task.lifecycle.listeners</a>.
-                    </td>
-                </tr>
-
-                <tr>
-                    <td class="property" id="task-lifecycle-listeners">task.lifecycle.listeners</td>
-                    <td class="default"></td>
-                    <td class="description">
-                        If you have defined <a href="../container/event-loop.html#lifecycle-listeners">lifecycle listeners</a> with
-                        <a href="#task-lifecycle-listener-class" class="property">task.lifecycle.listener.*.class</a>,
-                        you need to list them here in order to enable them. The value of this property is a
-                        comma-separated list of <span class="listener">listener-name</span> tokens.
-                    </td>
-                </tr>
-
-                <tr>
                     <td class="property" id="task-drop-deserialization-errors">task.drop.deserialization.errors</td>
                     <td class="default"></td>
                     <td class="description">
index 35de8cc..6d10212 100644 (file)
@@ -27,7 +27,7 @@ import java.util.Set;
 
 /**
  * A TaskContext provides resources about the {@link org.apache.samza.task.StreamTask}, particularly during
- * initialization in an {@link org.apache.samza.task.InitableTask} and during calls to {@link org.apache.samza.task.TaskLifecycleListener}s.
+ * initialization in an {@link org.apache.samza.task.InitableTask}.
  */
 public interface TaskContext {
   MetricsRegistry getMetricsRegistry();
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListener.java b/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListener.java
deleted file mode 100644 (file)
index 55524a1..0000000
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.system.IncomingMessageEnvelope;
-
-/**
- * Used to get before/after notifications before initializing/closing all tasks
- * in a given container (JVM/process).
- */
-public interface TaskLifecycleListener {
-  /**
-   * Called before all tasks in TaskRunner are initialized.
-   * 
-   * @param config
-   *          Config for the Samza job.
-   * @param context
-   *          TaskContext for the StreamTask that's being initialized.
-   */
-  void beforeInit(Config config, TaskContext context);
-
-  /**
-   * Called after all tasks in TaskRunner are initialized.
-   * 
-   * @param config
-   *          Config for the Samza job.
-   * @param context
-   *          TaskContext for the StreamTask that's being initialized.
-   */
-  void afterInit(Config config, TaskContext context);
-
-  /**
-   * Called before a message is processed by a task.
-   * 
-   * @param envelope
-   *          The envelope to be processed by the StreamTask.
-   * @param config
-   *          Config for the Samza job.
-   * @param context
-   *          TaskContext for the StreamTask that's about to process a message.
-   */
-  void beforeProcess(IncomingMessageEnvelope envelope, Config config, TaskContext context);
-
-  /**
-   * Called after a message is processed by a task.
-   * 
-   * @param envelope
-   *          The envelope that was processed by the StreamTask.
-   * @param config
-   *          Config for the Samza job.
-   * @param context
-   *          TaskContext for the StreamTask that just processed a message.
-   */
-  void afterProcess(IncomingMessageEnvelope envelope, Config config, TaskContext context);
-
-  /**
-   * Called before all tasks in TaskRunner are closed.
-   * 
-   * @param config
-   *          Config for the Samza job.
-   * @param context
-   *          TaskContext for the StreamTask that's about to be shutdown.
-   */
-  void beforeClose(Config config, TaskContext context);
-
-  /**
-   * Called after all tasks in TaskRunner are closed.
-   * 
-   * @param config
-   *          Config for the Samza job.
-   * @param context
-   *          TaskContext for the StreamTask that was just shutdown.
-   */
-  void afterClose(Config config, TaskContext context);
-}
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListenerFactory.java b/samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListenerFactory.java
deleted file mode 100644 (file)
index 5ed7054..0000000
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task;
-
-import org.apache.samza.config.Config;
-
-/**
- * Build a {@link org.apache.samza.task.TaskLifecycleListener}
- */
-public interface TaskLifecycleListenerFactory {
-  TaskLifecycleListener getLifecyleListener(String name, Config config);
-}
index 5885a88..2b53440 100644 (file)
@@ -54,8 +54,6 @@ import org.apache.samza.system.chooser.MessageChooserFactory
 import org.apache.samza.system.chooser.RoundRobinChooserFactory
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskInstanceCollector
-import org.apache.samza.task.TaskLifecycleListener
-import org.apache.samza.task.TaskLifecycleListenerFactory
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
 import scala.collection.JavaConversions._
@@ -340,24 +338,6 @@ object SamzaContainer extends Logging {
       metrics = systemProducersMetrics,
       dropSerializationError = dropSerializationError)
 
-    val listeners = config.getLifecycleListeners match {
-      case Some(listeners) => {
-        listeners.split(",").map(listenerName => {
-          info("Loading lifecycle listener: %s" format listenerName)
-
-          val listenerClassName = config.getLifecycleListenerClass(listenerName).getOrElse(throw new SamzaException("Referencing missing listener %s in config" format listenerName))
-
-          Util.getObj[TaskLifecycleListenerFactory](listenerClassName)
-            .getLifecyleListener(listenerName, config)
-        }).toList
-      }
-      case _ => {
-        info("No lifecycle listeners found")
-
-        List[TaskLifecycleListener]()
-      }
-    }
-
     // TODO not sure how we should make this config based, or not. Kind of
     // strange, since it has some dynamic directories when used with YARN.
     val storeBaseDir = new File(System.getProperty("user.dir"), "state")
@@ -481,7 +461,6 @@ object SamzaContainer extends Logging {
         offsetManager = offsetManager,
         storageManager = storageManager,
         reporters = reporters,
-        listeners = listeners,
         systemStreamPartitions = systemStreamPartitions,
         exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics, config))
 
index 66f7dbe..327299b 100644 (file)
@@ -32,7 +32,6 @@ import org.apache.samza.task.TaskContext
 import org.apache.samza.task.ClosableTask
 import org.apache.samza.task.InitableTask
 import org.apache.samza.task.WindowableTask
-import org.apache.samza.task.TaskLifecycleListener
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.ReadableCoordinator
 import org.apache.samza.task.TaskInstanceCollector
@@ -50,7 +49,6 @@ class TaskInstance(
   offsetManager: OffsetManager = new OffsetManager,
   storageManager: TaskStorageManager = null,
   reporters: Map[String, MetricsReporter] = Map(),
-  listeners: Seq[TaskLifecycleListener] = Seq(),
   val systemStreamPartitions: Set[SystemStreamPartition] = Set(),
   val exceptionHandler: TaskInstanceExceptionHandler = new TaskInstanceExceptionHandler) extends Logging {
   val isInitableTask = task.isInstanceOf[InitableTask]
@@ -92,8 +90,6 @@ class TaskInstance(
   }
 
   def initTask {
-    listeners.foreach(_.beforeInit(config, context))
-
     if (isInitableTask) {
       debug("Initializing task for taskName: %s" format taskName)
 
@@ -101,8 +97,6 @@ class TaskInstance(
     } else {
       debug("Skipping task initialization for taskName: %s" format taskName)
     }
-
-    listeners.foreach(_.afterInit(config, context))
   }
 
   def registerProducers {
@@ -129,16 +123,12 @@ class TaskInstance(
   def process(envelope: IncomingMessageEnvelope, coordinator: ReadableCoordinator) {
     metrics.processes.inc
 
-    listeners.foreach(_.beforeProcess(envelope, config, context))
-
     trace("Processing incoming message envelope for taskName and SSP: %s, %s" format (taskName, envelope.getSystemStreamPartition))
 
     exceptionHandler.maybeHandle {
       task.process(envelope, collector, coordinator)
     }
 
-    listeners.foreach(_.afterProcess(envelope, config, context))
-
     trace("Updating offset map for taskName, SSP and offset: %s, %s, %s" format (taskName, envelope.getSystemStreamPartition, envelope.getOffset))
 
     offsetManager.update(envelope.getSystemStreamPartition, envelope.getOffset)
@@ -173,8 +163,6 @@ class TaskInstance(
   }
 
   def shutdownTask {
-    listeners.foreach(_.beforeClose(config, context))
-
     if (task.isInstanceOf[ClosableTask]) {
       debug("Shutting down stream task for taskName: %s" format taskName)
 
@@ -182,8 +170,6 @@ class TaskInstance(
     } else {
       debug("Skipping stream task shutdown for taskName: %s" format taskName)
     }
-
-    listeners.foreach(_.afterClose(config, context))
   }
 
   def shutdownStores {