Merge branch 'master' into 0.14.0
authorYi Pan (Data Infrastructure) <nickpan47@gmail.com>
Wed, 9 Aug 2017 17:39:55 +0000 (10:39 -0700)
committerYi Pan (Data Infrastructure) <nickpan47@gmail.com>
Wed, 9 Aug 2017 17:39:55 +0000 (10:39 -0700)
Conflicts:
samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java

1  2 
samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala

@@@ -67,10 -69,10 +68,10 @@@ public class StreamProcessor 
  
    private volatile SamzaContainer container = null;
    private volatile Throwable containerException = null;
--  
++
    // Latch used to synchronize between the JobCoordinator thread and the container thread, when the container is
    // stopped due to re-balancing
-   private volatile CountDownLatch jcContainerShutdownLatch = new CountDownLatch(1);
+   /* package private */volatile CountDownLatch jcContainerShutdownLatch;
    private volatile boolean processorOnStartCalled = false;
  
    @VisibleForTesting
  
        @Override
        public void onNewJobModel(String processorId, JobModel jobModel) {
-         if (!jobModel.getContainers().containsKey(processorId)) {
-           LOGGER.warn("JobModel does not contain the processorId: " + processorId + ". Stopping the processor.");
-           stop();
-         } else {
-           jcContainerShutdownLatch = new CountDownLatch(1);
-           SamzaContainerListener containerListener = new SamzaContainerListener() {
-             @Override
-             public void onContainerStart() {
-               if (!processorOnStartCalled) {
-                 // processorListener is called on start only the first time the container starts.
-                 // It is not called after every re-balance of partitions among the processors
-                 processorOnStartCalled = true;
-                 if (processorListener != null) {
-                   processorListener.onStart();
-                 }
-               } else {
-                 LOGGER.debug("StreamProcessorListener was notified of container start previously. Hence, skipping this time.");
-               }
-             }
+         jcContainerShutdownLatch = new CountDownLatch(1);
  
-             @Override
-             public void onContainerStop(boolean pauseByJm) {
-               if (pauseByJm) {
-                 LOGGER.info("Container " + container.toString() + " stopped due to a request from JobCoordinator.");
-                 if (jcContainerShutdownLatch != null) {
-                   jcContainerShutdownLatch.countDown();
-                 }
-               } else {  // sp.stop was called or container stopped by itself
-                 LOGGER.info("Container " + container.toString() + " stopped.");
-                 container = null; // this guarantees that stop() doesn't try to stop container again
-                 stop();
+         SamzaContainerListener containerListener = new SamzaContainerListener() {
+           @Override
+           public void onContainerStart() {
+             if (!processorOnStartCalled) {
+               // processorListener is called on start only the first time the container starts.
+               // It is not called after every re-balance of partitions among the processors
+               processorOnStartCalled = true;
+               if (processorListener != null) {
+                 processorListener.onStart();
                }
+             } else {
+               LOGGER.debug("StreamProcessorListener was notified of container start previously. Hence, skipping this time.");
              }
+           }
  
-             @Override
-             public void onContainerFailed(Throwable t) {
+           @Override
+           public void onContainerStop(boolean pauseByJm) {
+             if (pauseByJm) {
+               LOGGER.info("Container " + container.toString() + " stopped due to a request from JobCoordinator.");
                if (jcContainerShutdownLatch != null) {
                  jcContainerShutdownLatch.countDown();
-               } else {
-                 LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting.");
                }
-               containerException = t;
-               LOGGER.error("Container failed. Stopping the processor.", containerException);
-               container = null;
+             } else {  // sp.stop was called or container stopped by itself
+               LOGGER.info("Container " + container.toString() + " stopped.");
+               container = null; // this guarantees that stop() doesn't try to stop container again
                stop();
              }
-           };
+           }
  
-           container = createSamzaContainer(processorId, jobModel);
-           container.setContainerListener(containerListener);
-           LOGGER.info("Starting container " + container.toString());
-           executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-               .setNameFormat("p-" + processorId + "-container-thread-%d").build());
-           executorService.submit(container::run);
-         }
+           @Override
+           public void onContainerFailed(Throwable t) {
+             if (jcContainerShutdownLatch != null) {
+               jcContainerShutdownLatch.countDown();
+             } else {
+               LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting.");
+             }
+             containerException = t;
+             LOGGER.error("Container failed. Stopping the processor.", containerException);
+             container = null;
+             stop();
+           }
+         };
 -        container = createSamzaContainer(
 -            jobModel.getContainers().get(processorId),
 -            jobModel.maxChangeLogStreamPartitions);
++        container = createSamzaContainer(processorId, jobModel);
+         container.setContainerListener(containerListener);
+         LOGGER.info("Starting container " + container.toString());
+         executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+             .setNameFormat("p-" + processorId + "-container-thread-%d").build());
+         executorService.submit(container::run);
        }
  
        @Override