SAMZA-1004: Fix some logging and javadoc issues for AsyncStreamTask
authorXinyu Liu <xiliu@linkedin.com>
Wed, 7 Sep 2016 22:29:37 +0000 (15:29 -0700)
committerYi Pan (Data Infrastructure) <nickpan47@gmail.com>
Wed, 7 Sep 2016 22:29:37 +0000 (15:29 -0700)
samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala

index 684ba0b..de68c79 100644 (file)
@@ -22,7 +22,7 @@ package org.apache.samza.task;
 import org.apache.samza.system.IncomingMessageEnvelope;
 
 /**
- * An AsyncStreamTask is the basic class to support multithreading execution in Samza container. Its provided for better
+ * An AsyncStreamTask is the basic class to support multithreading execution in Samza container. It's provided for better
  * parallelism and resource utilization. This class allows task to make asynchronous calls and fire callbacks upon completion.
  * Similar to {@link StreamTask}, an AsyncStreamTask may be augmented by implementing other interfaces, such as
  * {@link InitableTask}, {@link WindowableTask}, or {@link ClosableTask}. The following invariants hold with these mix-ins:
@@ -33,8 +33,8 @@ import org.apache.samza.system.IncomingMessageEnvelope;
  * CloseableTask.close - always the last method invoked on an AsyncStreamTask and all other AsyncStreamTask are guaranteed
  * to happen-before it.
  *
- * AsyncStreamTask.processAsync - can run in either a serialized or parallel mode. In the serialized mode (task.process.max.inflight.messages=1),
- * each invocation of processAsync is guaranteed to happen-before the next. In a parallel execution mode (task.process.max.inflight.messages&gt;1),
+ * AsyncStreamTask.processAsync - can run in either a serialized or parallel mode. In the serialized mode (task.max.concurrency=1),
+ * each invocation of processAsync is guaranteed to happen-before the next. In a parallel execution mode (task.max.concurrency&gt;1),
  * there is no such happens-before constraint and the AsyncStreamTask is required to coordinate any shared state.
  *
  * WindowableTask.window - in either above mode, it is called when no invocations to processAsync are pending and no new
@@ -57,4 +57,4 @@ public interface AsyncStreamTask {
    * @param callback Triggers the completion of the process.
    */
   void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, TaskCallback callback);
-}
\ No newline at end of file
+}
index f786fc0..4ab4bce 100644 (file)
@@ -222,7 +222,9 @@ object SamzaContainer extends Logging {
     info("Got system consumers: %s" format consumers.keys)
 
     val isAsyncTask = classOf[AsyncStreamTask].isAssignableFrom(Class.forName(taskClassName))
-    info("%s is AsyncStreamTask" format taskClassName)
+    if (isAsyncTask) {
+      info("%s is AsyncStreamTask" format taskClassName)
+    }
 
     val producers = systemFactories
       .map {