SAMZA-1212 - Refactor interaction between StreamProcessor, JobCoordinator and SamzaCo...
authorNavina Ramesh <navina@apache.org>
Wed, 3 May 2017 22:10:13 +0000 (15:10 -0700)
committernramesh <nramesh@linkedin.com>
Wed, 3 May 2017 22:10:13 +0000 (15:10 -0700)
See SAMZA-1212 for motivation toward this refactoring.
Changes here are:
* Removed awaitStart (blocking) method in StreamProcessor, JobCoordinator and SamzaContainer
* Introduced SamzaContainerListener and JobCoordinatorListener interface implemented by StreamProcessor
* Introduced SamzaContainerStatus to handler failures and lifecycle using Listener interfaces

Author: Navina Ramesh <navina@apache.org>

Reviewers: Xinyu Liu <xiliu@linkedin.com>, Prateek Maheshwari <pmaheshw@linkedin.com>

Closes #148 from navina/SAMZA-1212

24 files changed:
samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorListener.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java [deleted file]
samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java
samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java [new file with mode: 0644]
samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala [new file with mode: 0644]
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala

diff --git a/samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java b/samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java
new file mode 100644 (file)
index 0000000..4565de6
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza;
+
+
+/**
+ * <pre>
+ *                                                                   runloop completed [OR]
+ *                  container.run()           runloop.run            container.shutdown()
+ *    NOT_STARTED -----------------> STARTING ------------> STARTED -----------------------> STOPPED
+ *                                       |      Error in runloop |
+ *                                       |      [OR] Error when  |
+ *                           Error when  |      stopping         |
+ *                   starting components |      components       |
+ *                                       V                       |
+ *                                    FAILED <-------------------|
+ * </pre>
+ */
+
+/**
+ * Indicates the current status of a {@link org.apache.samza.container.SamzaContainer}
+ */
+public enum  SamzaContainerStatus {
+  /**
+   * Indicates that the container has not been started
+   */
+  NOT_STARTED,
+
+  /**
+   * Indicates that the container is starting all the components required by the
+   * {@link org.apache.samza.container.RunLoop} for processing
+   */
+  STARTING,
+
+  /**
+   * Indicates that the container started the {@link org.apache.samza.container.RunLoop}
+   */
+  STARTED,
+
+  /**
+   * Indicates that the container was successfully stopped either due to task-initiated shutdown
+   * (eg. end-of-stream triggered shutdown or application-driven shutdown of all tasks and hence, the container) or
+   * due to external shutdown requests (eg. from {@link org.apache.samza.processor.StreamProcessor})
+   */
+  STOPPED,
+
+  /**
+   * Indicates that the container failed during any of its 3 active states -
+   * {@link #STARTING}, {@link #STARTED}, {@link #STOPPED}
+   */
+  FAILED
+}
diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
new file mode 100644 (file)
index 0000000..a9c3b2c
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container;
+
+/**
+ * A Listener for {@link org.apache.samza.container.SamzaContainer} lifecycle events.
+ */
+public interface SamzaContainerListener {
+
+  /**
+   *  Method invoked when the {@link org.apache.samza.container.SamzaContainer} has successfully transitioned to
+   *  the {@link org.apache.samza.SamzaContainerStatus#STARTED} state and is about to start the
+   *  {@link org.apache.samza.container.RunLoop}
+   */
+  void onContainerStart();
+
+  /**
+   *  Method invoked when the {@link org.apache.samza.container.SamzaContainer} has successfully transitioned to
+   *  {@link org.apache.samza.SamzaContainerStatus#STOPPED} state. Details on state transitions can be found in
+   *  {@link org.apache.samza.SamzaContainerStatus}
+   *  <br>
+   *  <b>Note</b>: This will be the last call after completely shutting down the SamzaContainer without any
+   *  exceptions/errors.
+   * @param pausedByJm boolean indicating why the container was stopped. It should be {@literal true}, iff the container
+   *                    was stopped as a result of an expired {@link org.apache.samza.job.model.JobModel}. Otherwise,
+   *                    it should be {@literal false}
+   */
+  void onContainerStop(boolean pausedByJm);
+
+  /**
+   *  Method invoked when the {@link org.apache.samza.container.SamzaContainer} has  transitioned to
+   *  {@link org.apache.samza.SamzaContainerStatus#FAILED} state. Details on state transitions can be found in
+   *  {@link org.apache.samza.SamzaContainerStatus}
+   *  <br>
+   *  <b>Note</b>: {@link #onContainerFailed(Throwable)} is mutually exclusive to {@link #onContainerStop(boolean)}.
+   * @param t Throwable that caused the container failure.
+   */
+  void onContainerFailed(Throwable t);
+}
index af2ef6a..bd06039 100644 (file)
@@ -28,39 +28,41 @@ import org.apache.samza.job.model.JobModel;
  *  based on the underlying environment. In some cases, ID assignment is completely config driven, while in other
  *  cases, ID assignment may require coordination with JobCoordinators of other StreamProcessors.
  *
- *  This interface contains methods required for the StreamProcessor to interact with JobCoordinator.
+ *  StreamProcessor registers a {@link JobCoordinatorListener} in order to get notified about JobModel changes and
+ *  Coordinator state change.
+ *
+ * <pre>
+ *   {@code
+ *  *******************  start()                            ********************
+ *  *                 *----------------------------------->>*                  *
+ *  *                 *         onNewJobModel    ************                  *
+ *  *                 *<<------------------------* Job      *                  *
+ *  *                 *     onJobModelExpired    * Co-      *                  *
+ *  *                 *<<------------------------* ordinator*                  *
+ *  * StreamProcessor *     onCoordinatorStop    * Listener *  JobCoordinator  *
+ *  *                 *<<------------------------*          *                  *
+ *  *                 *  onCoordinatorFailure    *          *                  *
+ *  *                 *<<------------------------************                  *
+ *  *                 *  stop()                             *                  *
+ *  *                 *----------------------------------->>*                  *
+ *  *******************                                     ********************
+ *  }
+ *  </pre>
  */
 @InterfaceStability.Evolving
 public interface JobCoordinator {
   /**
-   * Starts the JobCoordinator which involves one or more of the following:
-   * * LeaderElector Module initialization, if any
-   * * If leader, generate JobModel. Else, read JobModel
+   * Starts the JobCoordinator, which generally consists of participating in LeaderElection and listening for JobModel
+   * changes.
    */
   void start();
 
   /**
-   * Cleanly shutting down the JobCoordinator involves:
-   * * Shutting down the Container
-   * * Shutting down the LeaderElection module (TBD: details depending on leader or not)
+   * Stops the JobCoordinator and notifies the registered {@link JobCoordinatorListener}, if any
    */
   void stop();
 
   /**
-   * Waits for a specified amount of time for the JobCoordinator to fully start-up, which means it should be ready to
-   * process messages.
-   * In a Standalone use-case, it may be sufficient to wait for the container to start-up.
-   * In a ZK based Standalone use-case, it also includes registration with ZK, initialization of the
-   * leader elector module, container start-up etc.
-   *
-   * @param timeoutMs Maximum time to wait, in milliseconds
-   * @return {@code true}, if the JobCoordinator is started within the specified wait time and {@code false} if the
-   * waiting time elapsed
-   * @throws InterruptedException if the current thread is interrupted while waiting for the JobCoordinator to start-up
-   */
-  boolean awaitStart(long timeoutMs) throws InterruptedException;
-
-  /**
    * Returns the identifier assigned to the processor that is local to the instance of StreamProcessor.
    *
    * The semantics and format of the identifier returned should adhere to the specification defined in
@@ -71,6 +73,13 @@ public interface JobCoordinator {
   String getProcessorId();
 
   /**
+   * Registers a {@link JobCoordinatorListener} to receive notification on coordinator state changes and job model changes
+   *
+   * @param listener An instance of {@link JobCoordinatorListener}
+   */
+  void setListener(JobCoordinatorListener listener);
+
+  /**
    * Returns the current JobModel
    * The implementation of the JobCoordinator in the leader needs to know how to read the config and generate JobModel
    * In case of a non-leader, the JobCoordinator should simply fetch the jobmodel
index 7f7e1ed..784d48d 100644 (file)
@@ -20,17 +20,13 @@ package org.apache.samza.coordinator;
 
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
-import org.apache.samza.processor.SamzaContainerController;
-
 
 @InterfaceStability.Evolving
 public interface JobCoordinatorFactory {
   /**
-   * @param processorId {@link org.apache.samza.processor.StreamProcessor} id
+   * @param processorId Identifier for {@link org.apache.samza.processor.StreamProcessor} instance
    * @param config Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
-   * @param containerController Controller interface for starting and stopping container. In future, it may simply
-   *                            pause the container and add/remove tasks
    * @return An instance of IJobCoordinator
    */
-  JobCoordinator getJobCoordinator(String processorId, Config config, SamzaContainerController containerController);
+  JobCoordinator getJobCoordinator(String processorId, Config config);
 }
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorListener.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorListener.java
new file mode 100644 (file)
index 0000000..8e17032
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import org.apache.samza.job.model.JobModel;
+
+/**
+ * Listener interface that can be registered with a {@link org.apache.samza.coordinator.JobCoordinator} instance in order
+ * to receive notifications.
+ */
+public interface JobCoordinatorListener {
+  /**
+   * Method invoked by a {@link org.apache.samza.coordinator.JobCoordinator} in the following scenarios:
+   * <ul>
+   *  <li>the existing {@link JobModel} is no longer valid due to either re-balancing </li>
+   *  <li>JobCoordinator is shutting down</li>
+   * </ul>
+   */
+  void onJobModelExpired();
+
+  /**
+   * Method invoked by a {@link org.apache.samza.coordinator.JobCoordinator} when there is new {@link JobModel}
+   * available for use by the processor.
+   *
+   * @param processorId String, representing the identifier of {@link org.apache.samza.processor.StreamProcessor}
+   * @param jobModel Current {@link JobModel} containing a {@link org.apache.samza.job.model.ContainerModel} for the
+   *                 given processorId
+   */
+  // TODO: Can change interface to ContainerModel if maxChangelogStreamPartitions can be made a part of ContainerModel
+  void onNewJobModel(String processorId, JobModel jobModel);
+
+  /**
+   * Method invoked by a {@link org.apache.samza.coordinator.JobCoordinator} when it is shutting without any errors
+   */
+  void onCoordinatorStop();
+
+  /**
+   *
+   * Method invoked by a {@link org.apache.samza.coordinator.JobCoordinator} when it is shutting down with error.
+   * <b>Note</b>: This should be the last call after completely shutting down the JobCoordinator.
+   *
+   * @param t Throwable that was the cause of the JobCoordinator failure
+   */
+  void onCoordinatorFailure(Throwable t);
+}
diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
deleted file mode 100644 (file)
index 4af413a..0000000
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.processor;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.apache.samza.config.ClusterManagerConfig;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.TaskConfigJava;
-import org.apache.samza.container.LocalityManager;
-import org.apache.samza.container.SamzaContainer;
-import org.apache.samza.container.SamzaContainer$;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.metrics.JmxServer;
-import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SamzaContainerController {
-  private static final Logger log = LoggerFactory.getLogger(SamzaContainerController.class);
-
-  private ExecutorService executorService;
-  private volatile SamzaContainer container;
-  private final Map<String, MetricsReporter> metricsReporterMap;
-  private final Object taskFactory;
-  private final long containerShutdownMs;
-  private final StreamProcessorLifecycleListener lifecycleListener;
-
-  // Internal Member Variables
-  private Future containerFuture;
-
-  /**
-   * Creates an instance of a controller for instantiating, starting and/or stopping {@link SamzaContainer}
-   * Requests to execute a container are submitted to the {@link ExecutorService}
-   *
-   * @param taskFactory         Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or
-   *                            {@link org.apache.samza.task.AsyncStreamTask}
-   * @param containerShutdownMs How long the Samza container should wait for an orderly shutdown of task instances
-   * @param metricsReporterMap  Map of metric reporter name and {@link MetricsReporter} instance
-   * @param lifecycleListener {@link StreamProcessorLifecycleListener}
-   */
-  public SamzaContainerController(
-      Object taskFactory,
-      long containerShutdownMs,
-      Map<String, MetricsReporter> metricsReporterMap,
-      StreamProcessorLifecycleListener lifecycleListener) {
-    this.taskFactory = taskFactory;
-    this.metricsReporterMap = metricsReporterMap;
-    if (containerShutdownMs == -1) {
-      this.containerShutdownMs = TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS;
-    } else {
-      this.containerShutdownMs = containerShutdownMs;
-    }
-    // life cycle callbacks when shutdown and failure happens
-    this.lifecycleListener = lifecycleListener;
-  }
-
-  /**
-   * Instantiates a container and submits to the executor. This method does not actually wait for the container to
-   * fully start-up. For such a behavior, see {@link #awaitStart(long)}
-   * <p>
-   * <b>Note:</b> <i>This method does not stop a currently running container, if any. It is left up to the caller to
-   * ensure that the container has been stopped with stopContainer before invoking this method.</i>
-   *
-   * @param containerModel               {@link ContainerModel} instance to use for the current run of the Container
-   * @param config                       Complete configuration map used by the Samza job
-   * @param maxChangelogStreamPartitions Max number of partitions expected in the changelog streams
-   *                                     TODO: Try to get rid of maxChangelogStreamPartitions from method arguments
-   */
-  public void startContainer(ContainerModel containerModel, Config config, int maxChangelogStreamPartitions) {
-    LocalityManager localityManager = null;
-    if (new ClusterManagerConfig(config).getHostAffinityEnabled()) {
-      localityManager = SamzaContainer$.MODULE$.getLocalityManager(containerModel.getProcessorId(), config);
-    }
-    log.info("About to create container: " + containerModel.getProcessorId());
-    container = SamzaContainer$.MODULE$.apply(
-        containerModel.getProcessorId(),
-        containerModel,
-        config,
-        maxChangelogStreamPartitions,
-        localityManager,
-        new JmxServer(),
-        Util.<String, MetricsReporter>javaMapAsScalaMap(metricsReporterMap),
-        taskFactory);
-    log.info("About to start container: " + containerModel.getProcessorId());
-    executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-        .setNameFormat("p-" + containerModel.getProcessorId() + "-container-thread-%d").build());
-    containerFuture = executorService.submit(() -> {
-        try {
-          container.run();
-          lifecycleListener.onShutdown();
-        } catch (Throwable t) {
-          lifecycleListener.onFailure(t);
-        }
-      });
-  }
-
-  /**
-   * Method waits for a specified amount of time for the container to fully start-up, which consists of class-loading
-   * all the components and start message processing
-   *
-   * @param timeoutMs Maximum time to wait, in milliseconds
-   * @return {@code true}, if the container started within the specified wait time and {@code false} if the waiting
-   * time elapsed
-   * @throws InterruptedException if the current thread is interrupted while waiting for container to start-up
-   */
-  public boolean awaitStart(long timeoutMs) throws InterruptedException {
-    return container.awaitStart(timeoutMs);
-  }
-
-  /**
-   * Stops a running container, if any. Invoking this method multiple times does not have any side-effects.
-   */
-  public void stopContainer() {
-    if (container == null) {
-      log.warn("Shutdown before a container was created.");
-      return;
-    }
-
-    container.shutdown();
-    try {
-      if (containerFuture != null)
-        containerFuture.get(containerShutdownMs, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException | ExecutionException e) {
-      log.error("Ran into problems while trying to stop the container in the processor!", e);
-    } catch (TimeoutException e) {
-      log.warn("Got Timeout Exception while trying to stop the container in the processor! The processor may not shutdown properly", e);
-    }
-  }
-
-  /**
-   * Shutsdown the controller by first stop any running container and then, shutting down the {@link ExecutorService}
-   */
-  public void shutdown() {
-    stopContainer();
-    if (executorService != null) {
-      executorService.shutdown();
-    }
-  }
-}
index 1910594..6329f6c 100644 (file)
  */
 package org.apache.samza.processor;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.samza.SamzaContainerStatus;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.container.IllegalContainerStateException;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.container.SamzaContainerListener;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.coordinator.JobCoordinatorListener;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.JmxServer;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.task.AsyncStreamTaskFactory;
 import org.apache.samza.task.StreamTaskFactory;
 import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 /**
  * StreamProcessor can be embedded in any application or executed in a distributed environment (aka cluster) as an
  * independent process.
  * <p>
- * <b>Usage Example:</b>
- * <pre>
- * StreamProcessor processor = new StreamProcessor(1, config);
- * processor.start();
- * try {
- *  boolean status = processor.awaitStart(TIMEOUT_MS);    // Optional - blocking call
- *  if (!status) {
- *    // Timed out
- *  }
- *  ...
- * } catch (InterruptedException ie) {
- *   ...
- * } finally {
- *   processor.stop();
- * }
- * </pre>
- * Note: A single JVM can create multiple StreamProcessor instances. It is safe to create StreamProcessor instances in
+ *
+ * <b>Note</b>: A single JVM can create multiple StreamProcessor instances. It is safe to create StreamProcessor instances in
  * multiple threads.
  */
 @InterfaceStability.Evolving
 public class StreamProcessor {
+  private static final Logger LOGGER = LoggerFactory.getLogger(StreamProcessor.class);
+
   private final JobCoordinator jobCoordinator;
-  private final StreamProcessorLifecycleListener lifecycleListener;
-  private final String processorId;
+  private final StreamProcessorLifecycleListener processorListener;
+  private final Object taskFactory;
+  private final Map<String, MetricsReporter> customMetricsReporter;
+  private final Config config;
+  private final long taskShutdownMs;
+
+  private ExecutorService executorService;
+
+  private volatile SamzaContainer container = null;
+  // Latch used to synchronize between the JobCoordinator thread and the container thread, when the container is
+  // stopped due to re-balancing
+  private volatile CountDownLatch jcContainerShutdownLatch = new CountDownLatch(1);
+  private volatile boolean processorOnStartCalled = false;
+
+  @VisibleForTesting
+  JobCoordinatorListener jobCoordinatorListener = null;
 
   /**
    * Create an instance of StreamProcessor that encapsulates a JobCoordinator and Samza Container
@@ -70,84 +87,246 @@ public class StreamProcessor {
    * <p>
    * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the StreamProcessor, and NOT exposed to the user
    *
+   * @param processorId            String identifier for this processor
    * @param config                 Instance of config object - contains all configuration required for processing
    * @param customMetricsReporters Map of custom MetricReporter instances that are to be injected in the Samza job
    * @param asyncStreamTaskFactory The {@link AsyncStreamTaskFactory} to be used for creating task instances.
-   * @param lifecycleListener         listener to the StreamProcessor life cycle
+   * @param processorListener         listener to the StreamProcessor life cycle
    */
   public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
-                         AsyncStreamTaskFactory asyncStreamTaskFactory, StreamProcessorLifecycleListener lifecycleListener) {
-    this(processorId, config, customMetricsReporters, (Object) asyncStreamTaskFactory, lifecycleListener);
+                         AsyncStreamTaskFactory asyncStreamTaskFactory, StreamProcessorLifecycleListener processorListener) {
+    this(processorId, config, customMetricsReporters, (Object) asyncStreamTaskFactory, processorListener);
   }
 
-
   /**
    *Same as {@link #StreamProcessor(String, Config, Map, AsyncStreamTaskFactory, StreamProcessorLifecycleListener)}, except task
    * instances are created using the provided {@link StreamTaskFactory}.
    * @param config - config
    * @param customMetricsReporters metric Reporter
    * @param streamTaskFactory task factory to instantiate the Task
-   * @param lifecycleListener  listener to the StreamProcessor life cycle
+   * @param processorListener  listener to the StreamProcessor life cycle
    */
   public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
-                         StreamTaskFactory streamTaskFactory, StreamProcessorLifecycleListener lifecycleListener) {
-    this(processorId, config, customMetricsReporters, (Object) streamTaskFactory, lifecycleListener);
+                         StreamTaskFactory streamTaskFactory, StreamProcessorLifecycleListener processorListener) {
+    this(processorId, config, customMetricsReporters, (Object) streamTaskFactory, processorListener);
   }
 
-  private StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
-                          Object taskFactory, StreamProcessorLifecycleListener lifecycleListener) {
-    this.processorId = processorId;
-
-    SamzaContainerController containerController = new SamzaContainerController(
-        taskFactory,
-        new TaskConfigJava(config).getShutdownMs(),
-        customMetricsReporters,
-        lifecycleListener);
-
-    this.jobCoordinator = Util.
+  /* package private */
+  JobCoordinator getJobCoordinator(String processorId) {
+    return Util.
         <JobCoordinatorFactory>getObj(
             new JobCoordinatorConfig(config)
                 .getJobCoordinatorFactoryClassName())
-        .getJobCoordinator(processorId, config, containerController);
+        .getJobCoordinator(processorId, config);
+  }
 
-    this.lifecycleListener = lifecycleListener;
+  @VisibleForTesting
+  StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, Object taskFactory,
+                  StreamProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator) {
+    this.taskFactory = taskFactory;
+    this.config = config;
+    this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
+    this.customMetricsReporter = customMetricsReporters;
+    this.processorListener = processorListener;
+    this.jobCoordinator = jobCoordinator;
+    this.jobCoordinatorListener = createJobCoordinatorListener();
+    this.jobCoordinator.setListener(jobCoordinatorListener);
+  }
+
+  private StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+                          Object taskFactory, StreamProcessorLifecycleListener processorListener) {
+    this.taskFactory = taskFactory;
+    this.config = config;
+    this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
+    this.customMetricsReporter = customMetricsReporters;
+    this.processorListener = processorListener;
+    this.jobCoordinator = getJobCoordinator(processorId);
+    this.jobCoordinator.setListener(createJobCoordinatorListener());
   }
 
   /**
-   * StreamProcessor Lifecycle: start()
-   * <ul>
-   * <li>Starts the JobCoordinator and fetches the JobModel</li>
-   * <li>jobCoordinator.start returns after starting the container using ContainerModel </li>
-   * </ul>
-   * When start() returns, it only guarantees that the container is initialized and submitted by the controller to
-   * execute
+   * Asynchronously starts this {@link StreamProcessor}.
+   * <p>
+   *   <b>Implementation</b>:
+   *   Starts the {@link JobCoordinator}, which will eventually start the {@link SamzaContainer} when a new
+   *   {@link JobModel} is available.
+   * </p>
    */
   public void start() {
     jobCoordinator.start();
-    lifecycleListener.onStart();
   }
 
   /**
-   * Method that allows the user to wait for a specified amount of time for the container to initialize and start
-   * processing messages
+   * <p>
+   * Asynchronously stops the {@link StreamProcessor}'s running components - {@link SamzaContainer}
+   * and {@link JobCoordinator}
+   * </p>
+   * There are multiple ways in which the StreamProcessor stops:
+   * <ol>
+   *   <li>Caller of StreamProcessor invokes stop()</li>
+   *   <li>Samza Container completes processing (eg. bounded input) and shuts down</li>
+   *   <li>Samza Container fails</li>
+   *   <li>Job Coordinator fails</li>
+   * </ol>
+   * When either container or coordinator stops (cleanly or due to exception), it will try to shutdown the
+   * StreamProcessor. This needs to be synchronized so that only one code path gets triggered for shutdown.
+   * <br>
+   * If container is running,
+   * <ol>
+   *   <li>container is shutdown cleanly and {@link SamzaContainerListener#onContainerStop(boolean)} will trigger
+   *   {@link JobCoordinator#stop()}</li>
+   *   <li>container fails to shutdown cleanly and {@link SamzaContainerListener#onContainerFailed(Throwable)} will
+   *   trigger {@link JobCoordinator#stop()}</li>
+   * </ol>
+   * If container is not running, then this method will simply shutdown the {@link JobCoordinator}.
    *
-   * @param timeoutMs Maximum time to wait, in milliseconds
-   * @return {@code true}, if the container started within the specified wait time and {@code false} if the waiting time
-   * elapsed
-   * @throws InterruptedException if the current thread is interrupted while waiting for container to start-up
    */
-  public boolean awaitStart(long timeoutMs) throws InterruptedException {
-    return jobCoordinator.awaitStart(timeoutMs);
+  public synchronized void stop() {
+    boolean containerShutdownInvoked = false;
+    if (container != null) {
+      try {
+        LOGGER.info("Shutting down container " + container.toString() + " from StreamProcessor");
+        container.shutdown();
+        containerShutdownInvoked = true;
+      } catch (IllegalContainerStateException icse) {
+        LOGGER.info("Container was not running", icse);
+      }
+    }
+
+    if (!containerShutdownInvoked) {
+      LOGGER.info("Shutting down JobCoordinator from StreamProcessor");
+      jobCoordinator.stop();
+    }
+
   }
 
-  /**
-   * StreamProcessor Lifecycle: stop()
-   * <ul>
-   * <li>Stops the SamzaContainer execution</li>
-   * <li>Stops the JobCoordinator</li>
-   * </ul>
-   */
-  public void stop() {
-    jobCoordinator.stop();
+  SamzaContainer createSamzaContainer(ContainerModel containerModel, int maxChangelogStreamPartitions, JmxServer jmxServer) {
+    return SamzaContainer.apply(
+        containerModel,
+        config,
+        maxChangelogStreamPartitions,
+        jmxServer,
+        Util.<String, MetricsReporter>javaMapAsScalaMap(customMetricsReporter),
+        taskFactory);
+  }
+
+  JobCoordinatorListener createJobCoordinatorListener() {
+    return new JobCoordinatorListener() {
+
+      @Override
+      public void onJobModelExpired() {
+        if (container != null) {
+          SamzaContainerStatus status = container.getStatus();
+          if (SamzaContainerStatus.NOT_STARTED.equals(status) || SamzaContainerStatus.STARTED.equals(status)) {
+            boolean shutdownComplete = false;
+            try {
+              LOGGER.info("Shutting down container in onJobModelExpired.");
+              container.pause();
+              shutdownComplete = jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
+            } catch (IllegalContainerStateException icse) {
+              // Ignored since container is not running
+              LOGGER.info("Container was not running.", icse);
+              shutdownComplete = true;
+            } catch (InterruptedException e) {
+              LOGGER.warn("Container shutdown was interrupted!" + container.toString(), e);
+            }
+            if (!shutdownComplete) {
+              LOGGER.warn("Container " + container.toString() + " may not have shutdown successfully. " +
+                  "Stopping the processor.");
+              container = null;
+              stop();
+            } else {
+              LOGGER.debug("Container " + container.toString() + " shutdown successfully");
+            }
+          } else {
+            LOGGER.debug("Container " + container.toString() + " is not running.");
+          }
+        } else {
+          LOGGER.debug("Container is not instantiated yet.");
+        }
+      }
+
+      @Override
+      public void onNewJobModel(String processorId, JobModel jobModel) {
+        if (!jobModel.getContainers().containsKey(processorId)) {
+          LOGGER.warn("JobModel does not contain the processorId: " + processorId + ". Stopping the processor.");
+          stop();
+        } else {
+          jcContainerShutdownLatch = new CountDownLatch(1);
+
+          SamzaContainerListener containerListener = new SamzaContainerListener() {
+            @Override
+            public void onContainerStart() {
+              if (!processorOnStartCalled) {
+                // processorListener is called on start only the first time the container starts.
+                // It is not called after every re-balance of partitions among the processors
+                processorOnStartCalled = true;
+                if (processorListener != null) {
+                  processorListener.onStart();
+                }
+              } else {
+                LOGGER.debug("StreamProcessorListener was notified of container start previously. Hence, skipping this time.");
+              }
+            }
+
+            @Override
+            public void onContainerStop(boolean pauseByJm) {
+              if (pauseByJm) {
+                LOGGER.info("Container " + container.toString() + " stopped due to a request from JobCoordinator.");
+                if (jcContainerShutdownLatch != null) {
+                  jcContainerShutdownLatch.countDown();
+                }
+              } else {  // sp.stop was called or container stopped by itself
+                LOGGER.info("Container " + container.toString() + " stopped.");
+                container = null; // this guarantees that stop() doesn't try to stop container again
+                stop();
+              }
+            }
+
+            @Override
+            public void onContainerFailed(Throwable t) {
+              if (jcContainerShutdownLatch != null) {
+                jcContainerShutdownLatch.countDown();
+              } else {
+                LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting.");
+              }
+              LOGGER.error("Container failed. Stopping the processor.", t);
+              container = null;
+              stop();
+            }
+          };
+
+          container = createSamzaContainer(
+              jobModel.getContainers().get(processorId),
+              jobModel.maxChangeLogStreamPartitions,
+              new JmxServer());
+          container.setContainerListener(containerListener);
+          LOGGER.info("Starting container " + container.toString());
+          executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+              .setNameFormat("p-" + processorId + "-container-thread-%d").build());
+          executorService.submit(container::run);
+        }
+      }
+
+      @Override
+      public void onCoordinatorStop() {
+        if (executorService != null) {
+          LOGGER.info("Shutting down the executor service.");
+          executorService.shutdownNow();
+        }
+        if (processorListener != null) {
+          processorListener.onShutdown();
+        }
+      }
+
+      @Override
+      public void onCoordinatorFailure(Throwable e) {
+        LOGGER.info("Coordinator Failed. Stopping the processor.");
+        stop();
+        if (processorListener != null) {
+          processorListener.onFailure(e);
+        }
+      }
+    };
   }
 }
index 7bca074..6b8e3c7 100644 (file)
@@ -31,6 +31,9 @@ import org.apache.samza.annotation.InterfaceStability;
 public interface StreamProcessorLifecycleListener {
   /**
    * Callback when the {@link StreamProcessor} is started
+   * This callback is invoked only once when {@link org.apache.samza.container.SamzaContainer} starts for the first time
+   * in the {@link StreamProcessor}. When there is a re-balance of tasks/partitions among the processors, the container
+   * may temporarily be "paused" and re-started again. For such re-starts, this callback is NOT invoked.
    */
   void onStart();
 
index 80350df..920cc3d 100644 (file)
@@ -33,6 +33,7 @@ import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.JmxServer;
 import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.container.SamzaContainerListener;
 import org.apache.samza.task.TaskFactoryUtil;
 import org.apache.samza.util.ScalaToJavaUtils;
 import org.apache.samza.util.Util;
@@ -55,6 +56,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
   private static final Logger log = LoggerFactory.getLogger(LocalContainerRunner.class);
   private final JobModel jobModel;
   private final String containerId;
+  private volatile Throwable containerException = null;
 
   public LocalContainerRunner(JobModel jobModel, String containerId) {
     super(jobModel.getConfig());
@@ -71,14 +73,30 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
       Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this);
 
       SamzaContainer container = SamzaContainer$.MODULE$.apply(
-          containerModel.getProcessorId(),
           containerModel,
           config,
           jobModel.maxChangeLogStreamPartitions,
-          SamzaContainer.getLocalityManager(containerId, config),
           jmxServer,
           Util.<String, MetricsReporter>javaMapAsScalaMap(new HashMap<>()),
           taskFactory);
+      container.setContainerListener(
+          new SamzaContainerListener() {
+            @Override
+            public void onContainerStart() {
+              log.info("Container Started");
+            }
+
+            @Override
+            public void onContainerStop(boolean invokedExternally) {
+              log.info("Container Stopped");
+            }
+
+            @Override
+            public void onContainerFailed(Throwable t) {
+              log.info("Container Failed");
+              containerException = t;
+            }
+          });
 
       container.run();
     } finally {
@@ -86,6 +104,10 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
         jmxServer.stop();
       }
     }
+    if (containerException != null) {
+      log.error("Container stopped with Exception. Exiting process now.", containerException);
+      System.exit(1);
+    }
   }
 
   @Override
index 0d74fb8..61ead18 100644 (file)
  */
 package org.apache.samza.standalone;
 
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.processor.SamzaContainerController;
-import org.apache.samza.runtime.ProcessorIdGenerator;
+import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemFactory;
@@ -38,6 +33,10 @@ import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Standalone Job Coordinator does not implement any leader elector module or cluster manager
  *
@@ -62,87 +61,79 @@ import org.slf4j.LoggerFactory;
  * </ul>
  * */
 public class StandaloneJobCoordinator implements JobCoordinator {
-  private static final Logger log = LoggerFactory.getLogger(StandaloneJobCoordinator.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(StandaloneJobCoordinator.class);
   private final String processorId;
   private final Config config;
-  private final JobModel jobModel;
-  private final SamzaContainerController containerController;
+  private JobCoordinatorListener coordinatorListener = null;
 
-  @VisibleForTesting
-  StandaloneJobCoordinator(
-      ProcessorIdGenerator processorIdGenerator,
-      Config config,
-      SamzaContainerController containerController,
-      JobModel jobModel) {
-    this.processorId = processorIdGenerator.generateProcessorId(config);
-    this.config = config;
-    this.containerController = containerController;
-    this.jobModel = jobModel;
-  }
-
-  public StandaloneJobCoordinator(String processorId, Config config, SamzaContainerController containerController) {
-    this.config = config;
-    this.containerController = containerController;
+  public StandaloneJobCoordinator(String processorId, Config config) {
     this.processorId = processorId;
-
-    JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
-    Map<String, SystemAdmin> systemAdmins = new HashMap<>();
-    for (String systemName: systemConfig.getSystemNames()) {
-      String systemFactoryClassName = systemConfig.getSystemFactory(systemName);
-      if (systemFactoryClassName == null) {
-        log.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
-        throw new SamzaException(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
-      }
-      SystemFactory systemFactory = Util.<SystemFactory>getObj(systemFactoryClassName);
-      systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config));
-    }
-
-    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance());
-
-    /** TODO:
-     * Locality Manager seems to be required in JC for reading locality info and grouping tasks intelligently and also,
-     * in SamzaContainer for writing locality info to the coordinator stream. This closely couples together
-     * TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator
-     * (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper)
-     */
-    this.jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, null);
+    this.config = config;
   }
 
   @Override
   public void start() {
     // No-op
-    JobModel jobModel = getJobModel();
-    containerController.startContainer(
-        jobModel.getContainers().get(getProcessorId()),
-        jobModel.getConfig(),
-        jobModel.maxChangeLogStreamPartitions);
+    JobModel jobModel = null;
+    try {
+      jobModel = getJobModel();
+    } catch (Exception e) {
+      LOGGER.error("Exception while trying to getJobModel.", e);
+      if (coordinatorListener != null) {
+        coordinatorListener.onCoordinatorFailure(e);
+      }
+    }
+    if (jobModel != null && jobModel.getContainers().containsKey(processorId)) {
+      if (coordinatorListener != null) {
+        coordinatorListener.onNewJobModel(processorId, jobModel);
+      }
+    } else {
+      stop();
+    }
   }
 
   @Override
   public void stop() {
     // No-op
-    containerController.shutdown();
+    if (coordinatorListener != null) {
+      coordinatorListener.onJobModelExpired();
+      coordinatorListener.onCoordinatorStop();
+    }
   }
 
-  /**
-   * Waits for a specified amount of time for the JobCoordinator to fully start-up, which means it should be ready to
-   * process messages. In a Standalone use-case, it may be sufficient to wait for the container to start-up. In case of
-   * ZK based Standalone use-case, it also includes registration with ZK, the initialization of leader elector module etc.
-   *
-   * @param timeoutMs Maximum time to wait, in milliseconds
-   */
   @Override
-  public boolean awaitStart(long timeoutMs) throws InterruptedException {
-    return containerController.awaitStart(timeoutMs);
+  public String getProcessorId() {
+    return processorId;
   }
 
   @Override
-  public String getProcessorId() {
-    return processorId;
+  public void setListener(JobCoordinatorListener listener) {
+    this.coordinatorListener = listener;
   }
 
   @Override
   public JobModel getJobModel() {
-    return jobModel;
+    JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
+    Map<String, SystemAdmin> systemAdmins = new HashMap<>();
+    for (String systemName: systemConfig.getSystemNames()) {
+      String systemFactoryClassName = systemConfig.getSystemFactory(systemName);
+      if (systemFactoryClassName == null) {
+        LOGGER.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
+        throw new SamzaException(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
+      }
+      SystemFactory systemFactory = Util.<SystemFactory>getObj(systemFactoryClassName);
+      systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config));
+    }
+
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(
+        Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance());
+
+    /** TODO:
+     Locality Manager seems to be required in JC for reading locality info and grouping tasks intelligently and also,
+     in SamzaContainer for writing locality info to the coordinator stream. This closely couples together
+     TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator
+     (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper)
+     */
+    return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, null);
   }
 }
index 0faeca9..8c27ebe 100644 (file)
@@ -21,11 +21,10 @@ package org.apache.samza.standalone;
 import org.apache.samza.config.Config;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
-import org.apache.samza.processor.SamzaContainerController;
 
 public class StandaloneJobCoordinatorFactory  implements JobCoordinatorFactory {
   @Override
-  public JobCoordinator getJobCoordinator(String processorId, Config config, SamzaContainerController containerController) {
-    return new StandaloneJobCoordinator(processorId, config, containerController);
+  public JobCoordinator getJobCoordinator(String processorId, Config config) {
+    return new StandaloneJobCoordinator(processorId, config);
   }
 }
\ No newline at end of file
index 0afd840..20de43c 100644 (file)
@@ -119,7 +119,6 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
 
   @Override
   public void waitForBarrier(String version, String participantName, Runnable callback) {
-
     setPaths(version);
     final String barrierProcessorThis = String.format("%s/%s", barrierProcessors, participantName);
 
index 61f7876..b6e3aed 100644 (file)
@@ -58,24 +58,17 @@ public class ZkControllerImpl implements ZkController {
             keyBuilder.getJobModelPathPrefix()});
   }
 
-  private void onBecomeLeader() {
-
-    listenToProcessorLiveness(); // subscribe for adding new processors
-
-    // inform the caller
-    zkControllerListener.onBecomeLeader();
-
-  }
-
   @Override
   public void register() {
-
     // TODO - make a loop here with some number of attempts.
     // possibly split into two method - becomeLeader() and becomeParticipant()
     leaderElector.tryBecomeLeader(new LeaderElectorListener() {
       @Override
       public void onBecomingLeader() {
-        onBecomeLeader();
+        listenToProcessorLiveness();
+
+        // inform the caller
+        zkControllerListener.onBecomeLeader();
       }
     });
 
index 1ddedbc..d2d0199 100644 (file)
@@ -35,7 +35,7 @@ import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.processor.SamzaContainerController;
+import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemFactory;
@@ -53,23 +53,19 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
 
   private final ZkUtils zkUtils;
   private final String processorId;
-
   private final ZkController zkController;
-  private final SamzaContainerController containerController;
   private final ScheduleAfterDebounceTime debounceTimer;
   private final StreamMetadataCache  streamMetadataCache;
-  private final ZkKeyBuilder keyBuilder;
   private final Config config;
   private final CoordinationUtils coordinationUtils;
 
+  private JobCoordinatorListener coordinatorListener = null;
   private JobModel newJobModel;
-  public ZkJobCoordinator(String processorId, Config config, ScheduleAfterDebounceTime debounceTimer,
-                          SamzaContainerController containerController) {
+
+  public ZkJobCoordinator(String processorId, Config config, ScheduleAfterDebounceTime debounceTimer) {
+    this.processorId = processorId;
     this.debounceTimer = debounceTimer;
-    this.containerController = containerController;
     this.config = config;
-    this.processorId = processorId;
 
     this.coordinationUtils = Util.
         <CoordinationServiceFactory>getObj(
@@ -78,7 +74,6 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
         .getCoordinationService(new ApplicationConfig(config).getGlobalAppId(), String.valueOf(processorId), config);
 
     this.zkUtils = ((ZkCoordinationUtils) coordinationUtils).getZkUtils();
-    this.keyBuilder = zkUtils.getKeyBuilder();
     this.zkController = new ZkControllerImpl(processorId, zkUtils, debounceTimer, this);
 
     streamMetadataCache = getStreamMetadataCache();
@@ -109,20 +104,23 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
 
   @Override
   public void stop() {
+    if (coordinatorListener != null) {
+      coordinatorListener.onJobModelExpired();
+    }
     zkController.stop();
-    if (containerController != null)
-      containerController.stopContainer();
+    if (coordinatorListener != null) {
+      coordinatorListener.onCoordinatorStop();
+    }
   }
 
   @Override
-  public boolean awaitStart(long timeoutMs)
-      throws InterruptedException {
-    return containerController.awaitStart(timeoutMs);
+  public String getProcessorId() {
+    return processorId;
   }
 
   @Override
-  public String getProcessorId() {
-    return processorId;
+  public void setListener(JobCoordinatorListener listener) {
+    this.coordinatorListener = listener;
   }
 
   @Override
@@ -147,13 +145,18 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     log.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size());
     // if list of processors is empty - it means we are called from 'onBecomeLeader'
     generateNewJobModel(processors);
+    if (coordinatorListener != null) {
+      coordinatorListener.onJobModelExpired();
+    }
   }
 
   @Override
   public void onNewJobModelAvailable(final String version) {
     log.info("pid=" + processorId + "new JobModel available");
     // stop current work
-    containerController.stopContainer();
+    if (coordinatorListener != null) {
+      coordinatorListener.onJobModelExpired();
+    }
     log.info("pid=" + processorId + "new JobModel available.Container stopped.");
     // get the new job model
     newJobModel = zkUtils.getJobModel(version);
@@ -179,8 +182,9 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     log.info("pid=" + processorId + "got the new job model in JobModelConfirmed =" + jobModel);
 
     // start the container with the new model
-    containerController.startContainer(jobModel.getContainers().get(processorId), jobModel.getConfig(),
-        jobModel.maxChangeLogStreamPartitions);
+    if (coordinatorListener != null) {
+      coordinatorListener.onNewJobModel(processorId, jobModel);
+    }
   }
 
   /**
index a44565c..a7239eb 100644 (file)
@@ -22,7 +22,6 @@ package org.apache.samza.zk;
 import org.apache.samza.config.Config;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
-import org.apache.samza.processor.SamzaContainerController;
 
 public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
   /**
@@ -30,17 +29,15 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
    *
    * @param processorId - id of this processor
    * @param config - configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
-   * @param containerController - controller to allow JobCoordinator control the SamzaContainer.
    * @return An instance of IJobCoordinator
    */
   @Override
-  public JobCoordinator getJobCoordinator(String processorId, Config config, SamzaContainerController containerController) {
+  public JobCoordinator getJobCoordinator(String processorId, Config config) {
     ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
 
     return new ZkJobCoordinator(
         processorId,
         config,
-        debounceTimer,
-        containerController);
+        debounceTimer);
   }
 }
index 8481c92..c7b2b7c 100644 (file)
@@ -22,15 +22,15 @@ package org.apache.samza.container
 import java.io.File
 import java.nio.file.Path
 import java.util
-import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, TimeUnit}
+import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
 import java.net.{URL, UnknownHostException}
 
-import org.apache.samza.SamzaException
+import org.apache.samza.{SamzaContainerStatus, SamzaException}
 import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.config.SerializerConfig.Config2Serializer
-import org.apache.samza.config.{Config, ShellCommandConfig, StorageConfig}
+import org.apache.samza.config.{ClusterManagerConfig, Config, ShellCommandConfig, StorageConfig}
 import org.apache.samza.config.StorageConfig.Config2Storage
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.SystemConfig.Config2System
@@ -48,7 +48,6 @@ import org.apache.samza.metrics.JmxServer
 import org.apache.samza.metrics.JvmMetrics
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.runtime.ApplicationRunner
 import org.apache.samza.serializers.SerdeFactory
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.serializers.model.SamzaObjectMapper
@@ -81,8 +80,7 @@ object SamzaContainer extends Logging {
   val DEFAULT_READ_JOBMODEL_DELAY_MS = 100
   val DISK_POLL_INTERVAL_KEY = "container.disk.poll.interval.ms"
 
-  def getLocalityManager(containerId: String, config: Config): LocalityManager = {
-    val containerName = getSamzaContainerName(containerId)
+  def getLocalityManager(containerName: String, config: Config): LocalityManager = {
     val registryMap = new MetricsRegistryMap(containerName)
     val coordinatorSystemProducer =
       new CoordinatorStreamSystemFactory()
@@ -108,20 +106,21 @@ object SamzaContainer extends Logging {
         classOf[JobModel])
   }
 
-  def getSamzaContainerName(containerId: String): String = {
-    "samza-container-%s" format containerId
-  }
-
   def apply(
-    containerId: String,
     containerModel: ContainerModel,
     config: Config,
     maxChangeLogStreamPartitions: Int,
-    localityManager: LocalityManager,
     jmxServer: JmxServer,
     customReporters: Map[String, MetricsReporter] = Map[String, MetricsReporter](),
     taskFactory: Object) = {
-    val containerName = getSamzaContainerName(containerId)
+    val containerId = containerModel.getProcessorId()
+    val containerName = "samza-container-%s" format containerId
+
+    var localityManager: LocalityManager = null
+    if (new ClusterManagerConfig(config).getHostAffinityEnabled()) {
+      localityManager = getLocalityManager(containerName, config)
+    }
+
     val containerPID = Util.getContainerPID
 
     info("Setting up Samza container: %s" format containerName)
@@ -627,23 +626,25 @@ class SamzaContainer(
   taskThreadPool: ExecutorService = null) extends Runnable with Logging {
 
   val shutdownMs = containerContext.config.getShutdownMs.getOrElse(5000L)
-  private val runLoopStartLatch: CountDownLatch = new CountDownLatch(1)
   var shutdownHookThread: Thread = null
 
-  def awaitStart(timeoutMs: Long): Boolean = {
-    try {
-      runLoopStartLatch.await(timeoutMs, TimeUnit.MILLISECONDS)
-    } catch {
-      case ie: InterruptedException =>
-        error("Interrupted while waiting for runloop to start!", ie)
-        throw ie
-    }
+  @volatile private var status = SamzaContainerStatus.NOT_STARTED
+  private var exceptionSeen: Throwable = null
+  private var paused: Boolean = false
+  private var containerListener: SamzaContainerListener = null
+
+  def getStatus(): SamzaContainerStatus = status
+
+  def setContainerListener(listener: SamzaContainerListener): Unit = {
+    containerListener = listener
   }
 
   def run {
     try {
       info("Starting container.")
 
+      status = SamzaContainerStatus.STARTING
+
       startMetrics
       startOffsetManager
       startLocalityManager
@@ -656,16 +657,24 @@ class SamzaContainer(
       startSecurityManger
 
       addShutdownHook
-      runLoopStartLatch.countDown()
       info("Entering run loop.")
+      status = SamzaContainerStatus.STARTED
+      if (containerListener != null) {
+        containerListener.onContainerStart()
+      }
       runLoop.run
     } catch {
       case e: Throwable =>
-        error("Caught exception/error in process loop.", e)
-        throw e
-    } finally {
+        if (status.equals(SamzaContainerStatus.STARTED)) {
+          error("Caught exception/error in run loop.", e)
+        } else {
+          error("Caught exception/error while initializing container.", e)
+        }
+        status = SamzaContainerStatus.FAILED
+        exceptionSeen = e
+    }
+    try {
       info("Shutting down.")
-
       removeShutdownHook
 
       shutdownConsumers
@@ -679,11 +688,64 @@ class SamzaContainer(
       shutdownMetrics
       shutdownSecurityManger
 
+      if (!status.equals(SamzaContainerStatus.FAILED)) {
+        status = SamzaContainerStatus.STOPPED
+      }
+
       info("Shutdown complete.")
+    } catch {
+      case e: Throwable =>
+        error("Caught exception/error while shutting down container.", e)
+        if (exceptionSeen == null) {
+          exceptionSeen = e
+        }
+        status = SamzaContainerStatus.FAILED
+    }
+
+    status match {
+      case SamzaContainerStatus.STOPPED =>
+        if (containerListener != null) {
+          containerListener.onContainerStop(paused)
+        }
+      case SamzaContainerStatus.FAILED =>
+        if (containerListener != null) {
+          containerListener.onContainerFailed(exceptionSeen)
+        }
     }
   }
 
-  def shutdown() = {
+  // TODO: We want to introduce a "PAUSED" state for SamzaContainer in the future so that StreamProcessor can pause and
+  // unpause the container when the jobmodel changes.
+  /**
+   * Marks the [[SamzaContainer]] as being paused by the called due to a change in [[JobModel]] and then, asynchronously
+   * shuts down this [[SamzaContainer]]
+   */
+  def pause(): Unit = {
+    paused = true
+    shutdown()
+  }
+
+  /**
+   * <p>
+   *   Asynchronously shuts down this [[SamzaContainer]]
+   * </p>
+   * <br>
+   * <b>Implementation</b>: Stops the [[RunLoop]], which will eventually transition the container from
+   * [[SamzaContainerStatus.STARTED]] to either [[SamzaContainerStatus.STOPPED]] or [[SamzaContainerStatus.FAILED]]].
+   * Based on the final `status`, [[SamzaContainerListener#onContainerStop(boolean)]] or
+   * [[SamzaContainerListener#onContainerFailed(Throwable)]] will be invoked respectively.
+   *
+   * @throws SamzaException, Thrown when the container has already been stopped or failed
+   */
+  def shutdown(): Unit = {
+    if (status == SamzaContainerStatus.STOPPED || status == SamzaContainerStatus.FAILED) {
+      throw new IllegalContainerStateException("Cannot shutdown a container with status - " + status)
+    }
+    shutdownRunLoop()
+  }
+
+  // Shutdown Runloop
+  def shutdownRunLoop() = {
     runLoop match {
       case runLoop: RunLoop => runLoop.shutdown
       case asyncRunLoop: AsyncRunLoop => asyncRunLoop.shutdown()
@@ -809,10 +871,7 @@ class SamzaContainer(
     shutdownHookThread = new Thread("CONTAINER-SHUTDOWN-HOOK") {
       override def run() = {
         info("Shutting down, will wait up to %s ms" format shutdownMs)
-        runLoop match {
-          case runLoop: RunLoop => runLoop.shutdown
-          case asyncRunLoop: AsyncRunLoop => asyncRunLoop.shutdown()
-        }
+        shutdownRunLoop()  //TODO: Pull out shutdown hook to LocalContainerRunner or SP
         try {
           runLoopThread.join(shutdownMs)
         } catch {
@@ -923,3 +982,14 @@ class SamzaContainer(
     }
   }
 }
+
+/**
+ * Exception thrown when the SamzaContainer tries to transition to an illegal state.
+ * {@link SamzaContainerStatus} has more details on the state transitions.
+ *
+ * @param s String, Message associated with the exception
+ * @param t Throwable, Wrapped error/exception thrown, if any.
+ */
+class IllegalContainerStateException(s: String, t: Throwable) extends SamzaException(s, t) {
+  def this(s: String) = this(s, null)
+}
index e0522b1..a61a297 100644 (file)
 
 package org.apache.samza.job.local
 
-import java.lang.Thread.UncaughtExceptionHandler
-
+import org.apache.samza.job.ApplicationStatus.{New, Running, SuccessfulFinish, UnsuccessfulFinish}
+import org.apache.samza.job.{ApplicationStatus, StreamJob}
 import org.apache.samza.util.Logging
-import org.apache.samza.job.StreamJob
-import org.apache.samza.job.ApplicationStatus
-import org.apache.samza.job.ApplicationStatus.New
-import org.apache.samza.job.ApplicationStatus.Running
-import org.apache.samza.job.ApplicationStatus.SuccessfulFinish
-import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish
 
 class ThreadJob(runnable: Runnable) extends StreamJob with Logging {
   @volatile var jobStatus: Option[ApplicationStatus] = None
index dcef3af..cb36863 100644 (file)
 package org.apache.samza.job.local
 
 
-import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap}
-import org.apache.samza.runtime.LocalContainerRunner
-import org.apache.samza.task.TaskFactoryUtil
-import org.apache.samza.util.Logging
-import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
-import org.apache.samza.config.ShellCommandConfig._
-import org.apache.samza.config.TaskConfig._
-import org.apache.samza.container.SamzaContainer
-import org.apache.samza.job.{ StreamJob, StreamJobFactory }
 import org.apache.samza.config.JobConfig._
+import org.apache.samza.config.ShellCommandConfig._
+import org.apache.samza.container.{SamzaContainerListener, SamzaContainer}
 import org.apache.samza.coordinator.JobModelManager
+import org.apache.samza.job.{StreamJob, StreamJobFactory}
+import org.apache.samza.metrics.{JmxServer, MetricsReporter}
+import org.apache.samza.runtime.LocalContainerRunner
+import org.apache.samza.task.TaskFactoryUtil
+import org.apache.samza.util.Logging
 
 /**
  * Creates a new Thread job with the given config
@@ -54,18 +51,32 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
       case _ => None
     }
 
+    val containerListener = new SamzaContainerListener {
+      override def onContainerFailed(t: Throwable): Unit = {
+        error("Container failed.", t)
+        throw t
+      }
+
+      override def onContainerStop(pausedOrNot: Boolean): Unit = {
+      }
+
+      override def onContainerStart(): Unit = {
+
+      }
+    }
     try {
       coordinator.start
-      new ThreadJob(
-            SamzaContainer(
-              containerModel.getProcessorId,
-              containerModel,
-              config,
-              jobModel.maxChangeLogStreamPartitions,
-              null,
-              jmxServer,
-              Map[String, MetricsReporter](),
-              taskFactory))
+      val container = SamzaContainer(
+        containerModel,
+        config,
+        jobModel.maxChangeLogStreamPartitions,
+        jmxServer,
+        Map[String, MetricsReporter](),
+        taskFactory)
+      container.setContainerListener(containerListener)
+
+      val threadJob = new ThreadJob(container)
+      threadJob
     } finally {
       coordinator.stop
       jmxServer.stop
diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
new file mode 100644 (file)
index 0000000..4a654dc
--- /dev/null
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.processor;
+
+import org.apache.samza.SamzaContainerStatus;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.RunLoop;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.JmxServer;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestStreamProcessor {
+
+  class TestableStreamProcessor extends StreamProcessor {
+    private final CountDownLatch containerStop = new CountDownLatch(1);
+    private final CountDownLatch runLoopStartForMain = new CountDownLatch(1);
+    private SamzaContainer containerReference = null;
+
+    public TestableStreamProcessor(
+        Config config,
+        Map<String, MetricsReporter> customMetricsReporters,
+        StreamTaskFactory streamTaskFactory,
+        StreamProcessorLifecycleListener processorListener,
+        JobCoordinator jobCoordinator) {
+      super(config, customMetricsReporters, streamTaskFactory, processorListener, jobCoordinator);
+    }
+
+    @Override
+    SamzaContainer createSamzaContainer(
+        ContainerModel containerModel,
+        int maxChangelogStreamPartitions,
+        JmxServer jmxServer) {
+      RunLoop mockRunLoop = mock(RunLoop.class);
+      doAnswer(invocation ->
+        {
+          try {
+            runLoopStartForMain.countDown();
+            containerStop.await();
+          } catch (InterruptedException e) {
+            System.out.println("In exception" + e);
+            e.printStackTrace();
+          }
+          return null;
+        }).when(mockRunLoop).run();
+
+      doAnswer(invocation ->
+        {
+          containerStop.countDown();
+          return null;
+        }).when(mockRunLoop).shutdown();
+      containerReference = StreamProcessorTestUtils.getDummyContainer(mockRunLoop, null, mock(StreamTask.class));
+      return containerReference;
+    }
+  }
+
+  /**
+   * Tests stop() method when Container AND JobCoordinator are running
+   */
+  @Test
+  public void testStopByProcessor() {
+    JobCoordinator mockJobCoordinator = mock(JobCoordinator.class);
+
+    final CountDownLatch processorListenerStop = new CountDownLatch(1);
+    final CountDownLatch processorListenerStart = new CountDownLatch(1);
+
+    TestableStreamProcessor processor = new TestableStreamProcessor(
+        new MapConfig(),
+        new HashMap<>(),
+        mock(StreamTaskFactory.class),
+        new StreamProcessorLifecycleListener() {
+          @Override
+          public void onStart() {
+            processorListenerStart.countDown();
+          }
+
+          @Override
+          public void onShutdown() {
+            processorListenerStop.countDown();
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+
+          }
+        },
+        mockJobCoordinator);
+
+    Map containers = mock(Map.class);
+    doReturn(true).when(containers).containsKey(anyString());
+    when(containers.get(anyString())).thenReturn(mock(ContainerModel.class));
+    JobModel mockJobModel = mock(JobModel.class);
+    when(mockJobModel.getContainers()).thenReturn(containers);
+
+    final CountDownLatch coordinatorStop = new CountDownLatch(1);
+    final Thread jcThread = new Thread(() ->
+      {
+        try {
+          processor.jobCoordinatorListener.onNewJobModel("1", mockJobModel);
+          coordinatorStop.await();
+          processor.jobCoordinatorListener.onCoordinatorStop();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      });
+
+    doAnswer(invocation ->
+      {
+        coordinatorStop.countDown();
+        return null;
+      }).when(mockJobCoordinator).stop();
+
+    doAnswer(invocation ->
+      {
+        jcThread.start();
+        return null;
+      }).when(mockJobCoordinator).start();
+
+    try {
+      processor.start();
+      processorListenerStart.await();
+
+      Assert.assertEquals(SamzaContainerStatus.STARTED, processor.containerReference.getStatus());
+
+      // This block is required for the mockRunloop is actually start.
+      // Otherwise, processor.stop gets triggered before mockRunloop begins to block
+      processor.runLoopStartForMain.await();
+
+      processor.stop();
+
+      processorListenerStop.await();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  // TODO:
+  // Test multiple start / stop and its ordering
+  // test onNewJobModel
+  // test onJobModelExpiry
+  // test Coordinator failure - correctly shutsdown the streamprocessor
+  // test Container failure
+}
index 010ff7e..bc4c47c 100644 (file)
@@ -23,7 +23,7 @@ import java.util
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicReference
 
-import org.apache.samza.Partition
+import org.apache.samza.{SamzaContainerStatus, Partition}
 import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
 import org.apache.samza.config.{Config, MapConfig}
 import org.apache.samza.coordinator.JobModelManager
@@ -37,7 +37,6 @@ import org.apache.samza.task.{ClosableTask, InitableTask, MessageCollector, Stre
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
 import org.junit.Assert._
 import org.junit.Test
-import org.mockito.Mockito.when
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.scalatest.junit.AssertionsForJUnit
@@ -181,6 +180,11 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       consumerMultiplexer = consumerMultiplexer,
       metrics = new SamzaContainerMetrics,
       maxThrottlingDelayMs = TimeUnit.SECONDS.toMillis(1))
+    @volatile var onContainerFailedCalled = false
+    @volatile var onContainerStopCalled = false
+    @volatile var onContainerStartCalled = false
+    @volatile var onContainerFailedThrowable: Throwable = null
+
     val container = new SamzaContainer(
       containerContext = containerContext,
       taskInstances = Map(taskName -> taskInstance),
@@ -188,19 +192,118 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics,
-      jmxServer = null
+      jmxServer = null)
+
+    val containerListener = new SamzaContainerListener {
+      override def onContainerFailed(t: Throwable): Unit = {
+        onContainerFailedCalled = true
+        onContainerFailedThrowable = t
+      }
+
+      override def onContainerStop(invokedExternally: Boolean): Unit = {
+        onContainerStopCalled = true
+      }
+
+      override def onContainerStart(): Unit = {
+        onContainerStartCalled = true
+      }
+    }
+    container.setContainerListener(containerListener)
+
+    container.run
+    assertTrue(task.wasShutdown)
+    assertFalse(onContainerStartCalled)
+    assertFalse(onContainerStopCalled)
+
+    assertTrue(onContainerFailedCalled)
+    assertNotNull(onContainerFailedThrowable)
+  }
+
+  // Exception in Runloop should cause SamzaContainer to transition to FAILED status, shutdown the components and then,
+  // invoke the callback
+  @Test
+  def testExceptionInTaskProcessRunLoop() {
+    val task = new StreamTask with InitableTask with ClosableTask {
+      var wasShutdown = false
+
+      def init(config: Config, context: TaskContext) {
+      }
+
+      def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
+        throw new Exception("Trigger a shutdown, please.")
+      }
+
+      def close {
+        wasShutdown = true
+      }
+    }
+    val config = new MapConfig
+    val taskName = new TaskName("taskName")
+    val consumerMultiplexer = new SystemConsumers(
+      new RoundRobinChooser,
+      Map[String, SystemConsumer]())
+    val producerMultiplexer = new SystemProducers(
+      Map[String, SystemProducer](),
+      new SerdeManager)
+    val collector = new TaskInstanceCollector(producerMultiplexer)
+    val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
+    val taskInstance: TaskInstance = new TaskInstance(
+      task,
+      taskName,
+      config,
+      new TaskInstanceMetrics,
+      null,
+      consumerMultiplexer,
+      collector,
+      containerContext
     )
-    try {
-      container.run
-      fail("Expected exception to be thrown in run method.")
-    } catch {
-      case e: Exception => // Expected
+
+    @volatile var onContainerFailedCalled = false
+    @volatile var onContainerStopCalled = false
+    @volatile var onContainerStartCalled = false
+    @volatile var onContainerFailedThrowable: Throwable = null
+
+    val mockRunLoop = mock[RunLoop]
+    when(mockRunLoop.run).thenThrow(new RuntimeException("Trigger a shutdown, please."))
+
+    val container = new SamzaContainer(
+      containerContext = containerContext,
+      taskInstances = Map(taskName -> taskInstance),
+      runLoop = mockRunLoop,
+      consumerMultiplexer = consumerMultiplexer,
+      producerMultiplexer = producerMultiplexer,
+      metrics = new SamzaContainerMetrics,
+      jmxServer = null)
+    val containerListener = new SamzaContainerListener {
+      override def onContainerFailed(t: Throwable): Unit = {
+        onContainerFailedCalled = true
+        onContainerFailedThrowable = t
+      }
+
+      override def onContainerStop(invokedExternally: Boolean): Unit = {
+        onContainerStopCalled = true
+      }
+
+      override def onContainerStart(): Unit = {
+        onContainerStartCalled = true
+      }
     }
+    container.setContainerListener(containerListener)
+
+    container.run
     assertTrue(task.wasShutdown)
+    assertTrue(onContainerStartCalled)
+
+    assertFalse(onContainerStopCalled)
+
+    assertTrue(onContainerFailedCalled)
+    assertNotNull(onContainerFailedThrowable)
+
+    assertEquals(SamzaContainerStatus.FAILED, container.getStatus())
   }
 
   @Test
-  def testErrorInTaskInitShutsDownTask {
+  def testErrorInTaskInitShutsDownTask() {
     val task = new StreamTask with InitableTask with ClosableTask {
       var wasShutdown = false
 
@@ -240,6 +343,11 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       consumerMultiplexer = consumerMultiplexer,
       metrics = new SamzaContainerMetrics,
       maxThrottlingDelayMs = TimeUnit.SECONDS.toMillis(1))
+    @volatile var onContainerFailedCalled = false
+    @volatile var onContainerStopCalled = false
+    @volatile var onContainerStartCalled = false
+    @volatile var onContainerFailedThrowable: Throwable = null
+
     val container = new SamzaContainer(
       containerContext = containerContext,
       taskInstances = Map(taskName -> taskInstance),
@@ -247,15 +355,187 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics,
-      jmxServer = null
-    )
-    try {
-      container.run
-      fail("Expected error to be thrown in run method.")
-    } catch {
-      case e: Throwable => // Expected
+      jmxServer = null)
+    val containerListener = new SamzaContainerListener {
+      override def onContainerFailed(t: Throwable): Unit = {
+        onContainerFailedCalled = true
+        onContainerFailedThrowable = t
+      }
+
+      override def onContainerStop(invokedExternally: Boolean): Unit = {
+        onContainerStopCalled = true
+      }
+
+      override def onContainerStart(): Unit = {
+        onContainerStartCalled = true
+      }
     }
+    container.setContainerListener(containerListener)
+
+    container.run
+
     assertTrue(task.wasShutdown)
+
+    assertFalse(onContainerStopCalled)
+    assertFalse(onContainerStartCalled)
+
+    assertTrue(onContainerFailedCalled)
+    assertNotNull(onContainerFailedThrowable)
+  }
+
+  @Test
+  def testRunloopShutdownIsClean(): Unit = {
+    val task = new StreamTask with InitableTask with ClosableTask {
+      var wasShutdown = false
+
+      def init(config: Config, context: TaskContext) {
+      }
+
+      def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
+      }
+
+      def close {
+        wasShutdown = true
+      }
+    }
+    val config = new MapConfig
+    val taskName = new TaskName("taskName")
+    val consumerMultiplexer = new SystemConsumers(
+      new RoundRobinChooser,
+      Map[String, SystemConsumer]())
+    val producerMultiplexer = new SystemProducers(
+      Map[String, SystemProducer](),
+      new SerdeManager)
+    val collector = new TaskInstanceCollector(producerMultiplexer)
+    val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
+    val taskInstance: TaskInstance = new TaskInstance(
+      task,
+      taskName,
+      config,
+      new TaskInstanceMetrics,
+      null,
+      consumerMultiplexer,
+      collector,
+      containerContext
+    )
+
+    @volatile var onContainerFailedCalled = false
+    @volatile var onContainerStopCalled = false
+    @volatile var onContainerStartCalled = false
+    @volatile var onContainerFailedThrowable: Throwable = null
+
+    val mockRunLoop = mock[RunLoop]
+    when(mockRunLoop.run).then(new Answer[Unit] {
+      override def answer(invocation: InvocationOnMock): Unit = {
+        Thread.sleep(100)
+      }
+    })
+
+    val container = new SamzaContainer(
+      containerContext = containerContext,
+      taskInstances = Map(taskName -> taskInstance),
+      runLoop = mockRunLoop,
+      consumerMultiplexer = consumerMultiplexer,
+      producerMultiplexer = producerMultiplexer,
+      metrics = new SamzaContainerMetrics,
+      jmxServer = null)
+      val containerListener = new SamzaContainerListener {
+        override def onContainerFailed(t: Throwable): Unit = {
+          onContainerFailedCalled = true
+          onContainerFailedThrowable = t
+        }
+
+        override def onContainerStop(invokedExternally: Boolean): Unit = {
+          onContainerStopCalled = true
+        }
+
+        override def onContainerStart(): Unit = {
+          onContainerStartCalled = true
+        }
+      }
+    container.setContainerListener(containerListener)
+
+    container.run
+    assertFalse(onContainerFailedCalled)
+    assertTrue(onContainerStartCalled)
+    assertTrue(onContainerStopCalled)
+  }
+
+  @Test
+  def testFailureDuringShutdown: Unit = {
+    val task = new StreamTask with InitableTask with ClosableTask {
+      def init(config: Config, context: TaskContext) {
+      }
+
+      def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
+
+      }
+
+      def close {
+        throw new Exception("Exception during shutdown, please.")
+      }
+    }
+    val config = new MapConfig
+    val taskName = new TaskName("taskName")
+    val consumerMultiplexer = new SystemConsumers(
+      new RoundRobinChooser,
+      Map[String, SystemConsumer]())
+    val producerMultiplexer = new SystemProducers(
+      Map[String, SystemProducer](),
+      new SerdeManager)
+    val collector = new TaskInstanceCollector(producerMultiplexer)
+    val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
+    val taskInstance: TaskInstance = new TaskInstance(
+      task,
+      taskName,
+      config,
+      new TaskInstanceMetrics,
+      null,
+      consumerMultiplexer,
+      collector,
+      containerContext
+    )
+
+    @volatile var onContainerFailedCalled = false
+    @volatile var onContainerStopCalled = false
+    @volatile var onContainerStartCalled = false
+    @volatile var onContainerFailedThrowable: Throwable = null
+
+    val mockRunLoop = mock[RunLoop]
+    when(mockRunLoop.run).then(new Answer[Unit] {
+      override def answer(invocation: InvocationOnMock): Unit = {
+        Thread.sleep(100)
+      }
+    })
+
+    val container = new SamzaContainer(
+      containerContext = containerContext,
+      taskInstances = Map(taskName -> taskInstance),
+      runLoop = mockRunLoop,
+      consumerMultiplexer = consumerMultiplexer,
+      producerMultiplexer = producerMultiplexer,
+      metrics = new SamzaContainerMetrics,
+      jmxServer = null)
+
+    val containerListener = new SamzaContainerListener {
+        override def onContainerFailed(t: Throwable): Unit = {
+          onContainerFailedCalled = true
+          onContainerFailedThrowable = t
+        }
+
+        override def onContainerStop(invokedExternally: Boolean): Unit = {
+          onContainerStopCalled = true
+        }
+
+        override def onContainerStart(): Unit = {
+          onContainerStartCalled = true
+        }
+      }
+    container.setContainerListener(containerListener)
+
+    container.run
+
+    assertTrue(onContainerFailedCalled)
   }
 
   @Test
@@ -303,8 +583,8 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = containerMetrics,
-      jmxServer = null
-    )
+      jmxServer = null)
+
     container.startStores
     assertNotNull(containerMetrics.taskStoreRestorationMetrics)
     assertNotNull(containerMetrics.taskStoreRestorationMetrics.get(taskName))
diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
new file mode 100644 (file)
index 0000000..f5a8ee5
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.processor
+
+import java.util.Collections
+
+import org.apache.samza.config.MapConfig
+import org.apache.samza.container.{SamzaContainerListener, RunLoop, SamzaContainer, SamzaContainerContext, SamzaContainerMetrics, TaskInstance, TaskInstanceMetrics, TaskName}
+import org.apache.samza.serializers.SerdeManager
+import org.apache.samza.system.chooser.RoundRobinChooser
+import org.apache.samza.system.{SystemConsumer, SystemConsumers, SystemProducer, SystemProducers}
+import org.apache.samza.task.{StreamTask, TaskInstanceCollector}
+
+
+object StreamProcessorTestUtils {
+  def getDummyContainer(mockRunloop: RunLoop, containerListener: SamzaContainerListener, streamTask: StreamTask) = {
+    val config = new MapConfig
+    val taskName = new TaskName("taskName")
+    val consumerMultiplexer = new SystemConsumers(
+      new RoundRobinChooser,
+      Map[String, SystemConsumer]())
+    val producerMultiplexer = new SystemProducers(
+      Map[String, SystemProducer](),
+      new SerdeManager)
+    val collector = new TaskInstanceCollector(producerMultiplexer)
+    val containerContext = new SamzaContainerContext("0", config, Collections.singleton[TaskName](taskName))
+    val taskInstance: TaskInstance = new TaskInstance(
+      streamTask,
+      taskName,
+      config,
+      new TaskInstanceMetrics,
+      null,
+      consumerMultiplexer,
+      collector,
+      containerContext
+    )
+
+    val container = new SamzaContainer(
+      containerContext = containerContext,
+      taskInstances = Map(taskName -> taskInstance),
+      runLoop = mockRunloop,
+      consumerMultiplexer = consumerMultiplexer,
+      producerMultiplexer = producerMultiplexer,
+      metrics = new SamzaContainerMetrics,
+      jmxServer = null)
+    if (containerListener != null) {
+      container.setContainerListener(containerListener)
+    }
+    container
+  }
+}
\ No newline at end of file
index a786468..f5bc73a 100644 (file)
 
 package org.apache.samza.system.kafka;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.StreamValidationException;
 import org.apache.samza.system.SystemAdmin;
@@ -31,7 +27,13 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
-import static org.junit.Assert.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 
 public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
index f37a224..75609aa 100644 (file)
@@ -260,7 +260,7 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
     boolean latchResult = false;
     processor.start();
     try {
-      processor.awaitStart(10000);
+      Thread.sleep(10000);
       latchResult = latch.await(10, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
       e.printStackTrace();
index 29fb6d3..59e9a89 100644 (file)
@@ -222,7 +222,8 @@ class StreamTaskTestUtil {
   def stopJob(job: StreamJob) {
     // Shutdown task.
     job.kill
-    assertEquals(ApplicationStatus.UnsuccessfulFinish, job.waitForFinish(60000))
+    val status = job.waitForFinish(60000)
+    assertEquals(ApplicationStatus.UnsuccessfulFinish, status)
   }
 
   /**