Fix log messages from StreamProcessor(onJobModelExpired event).
authorShanthoosh Venkataraman <svenkataraman@linkedin.com>
Tue, 25 Jul 2017 17:35:31 +0000 (10:35 -0700)
committernavina <navina@apache.org>
Tue, 25 Jul 2017 17:35:31 +0000 (10:35 -0700)
Log messages published in onJobModelExpired event have `processorId` as null. `processorId` is cached as final var in jobCoordinatorListener method. JLS for final fields/variables states that they're initialized before the constructor. This sets local final variable copy as null(since it relies upon value of instance variable to be set in constructor).
Changes
* Use processorId directly in `createCoordinatorListener` method.
* Remove StreamProcessor.toString since it has no usages.

Author: Shanthoosh Venkataraman <svenkataraman@linkedin.com>

Reviewers: Navina Ramesh <navina@apache.org>

Closes #249 from shanthoosh/fix_logging_in_stream_processor

samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java

index 89edd16..590fa11 100644 (file)
@@ -111,11 +111,6 @@ public class StreamProcessor {
     this(config, customMetricsReporters, (Object) streamTaskFactory, processorListener, null);
   }
 
-  @Override
-  public String toString() {
-    return "Processor:" + processorId;
-  }
-
   /* package private */
   JobCoordinator getJobCoordinator() {
     return Util.
@@ -210,7 +205,6 @@ public class StreamProcessor {
   }
 
   JobCoordinatorListener createJobCoordinatorListener() {
-    final String pid = this.toString();
     return new JobCoordinatorListener() {
 
       @Override
@@ -220,7 +214,7 @@ public class StreamProcessor {
           if (SamzaContainerStatus.NOT_STARTED.equals(status) || SamzaContainerStatus.STARTED.equals(status)) {
             boolean shutdownComplete = false;
             try {
-              LOGGER.info("Shutting down container in onJobModelExpired for processor:" + pid);
+              LOGGER.info("Shutting down container in onJobModelExpired for processor:" + processorId);
               container.pause();
               shutdownComplete = jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
               LOGGER.info("ShutdownComplete=" + shutdownComplete);
@@ -231,7 +225,7 @@ public class StreamProcessor {
             } catch (InterruptedException e) {
               LOGGER.warn("Container shutdown was interrupted!" + container.toString(), e);
             }
-            LOGGER.info("Shutting down container done for pid=" + pid + "; complete =" + shutdownComplete);
+            LOGGER.info("Shutting down container done for pid=" + processorId + "; complete =" + shutdownComplete);
             if (!shutdownComplete) {
               LOGGER.warn("Container " + container.toString() + " may not have shutdown successfully. " +
                   "Stopping the processor.");