SAMZA-1790: LocalContainerRunner should not extend AbstractApplicationRunner.
authorPrateek Maheshwari <pmaheshwari@apache.org>
Thu, 2 Aug 2018 13:55:54 +0000 (06:55 -0700)
committerPrateek Maheshwari <pmaheshwari@apache.org>
Thu, 2 Aug 2018 13:55:54 +0000 (06:55 -0700)
LocalContainerRunner is the launcher for the process running SamzaContainer in YARN. It extends the AbstractApplicationRunner since the container was using ApplicationRunner#getStreamSpec to create StreamSpecs from config to create the High Level API DAG. It doesn't implement any of the other APIs from the ApplicationRunner.

With SAMZA-1659 and SAMZA-1745, SamzaContainer no longer needs access to StreamSpec to create and execute the High Level API DAG. We can now clean up the LocalContainerRunner implementation so that it doesn't need to implement the ApplicationRunner interface.

Author: Prateek Maheshwari <pmaheshwari@apache.org>

Reviewers: Jagadish Venkatraman <vjagadis1989@gmail.com>, Yi Pan <nickpan47@gmail.com>

Closes #586 from prateekm/lcr-cleanup

samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java

index e6e622d..fe75883 100644 (file)
@@ -31,9 +31,9 @@ import org.apache.samza.container.ContainerHeartbeatClient;
 import org.apache.samza.container.ContainerHeartbeatMonitor;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainer$;
+import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.util.SamzaUncaughtExceptionHandler;
 import org.apache.samza.container.SamzaContainerListener;
-import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.task.TaskFactoryUtil;
 import org.apache.samza.util.ScalaJavaUtil;
@@ -41,38 +41,49 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * LocalContainerRunner is the local runner for Yarn {@link SamzaContainer}s. It is an intermediate step to
- * have a local runner for yarn before we consolidate the Yarn container and coordination into a
- * {@link org.apache.samza.processor.StreamProcessor}. This class will be replaced by the {@link org.apache.samza.processor.StreamProcessor}
- * local runner once that's done.
- *
- * Since we don't have the {@link org.apache.samza.coordinator.JobCoordinator} implementation in Yarn, the components (jobModel and containerId)
- * are directly inside the runner.
+ * Launches and manages the lifecycle for {@link SamzaContainer}s in YARN.
  */
-public class LocalContainerRunner extends AbstractApplicationRunner {
+public class LocalContainerRunner {
   private static final Logger log = LoggerFactory.getLogger(LocalContainerRunner.class);
-  private final JobModel jobModel;
-  private final String containerId;
-  private volatile Throwable containerRunnerException = null;
-  private ContainerHeartbeatMonitor containerHeartbeatMonitor;
-  private SamzaContainer container;
-
-  public LocalContainerRunner(JobModel jobModel, String containerId) {
-    super(jobModel.getConfig());
-    this.jobModel = jobModel;
-    this.containerId = containerId;
-  }
+  private static volatile Throwable containerRunnerException = null;
 
-  @Override
-  public void runTask() {
-    throw new UnsupportedOperationException("Running StreamTask is not implemented for LocalContainerRunner");
-  }
+  public static void main(String[] args) throws Exception {
+    Thread.setDefaultUncaughtExceptionHandler(
+        new SamzaUncaughtExceptionHandler(() -> {
+          log.info("Exiting process now.");
+          System.exit(1);
+        }));
+
+    String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
+    log.info(String.format("Got container ID: %s", containerId));
+    System.out.println(String.format("Container ID: %s", containerId));
 
-  @Override
-  public void run(StreamApplication streamApp) {
-    Object taskFactory = getTaskFactory(streamApp);
+    String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
+    log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
+    System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));
 
-    container = SamzaContainer$.MODULE$.apply(
+    int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
+    JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
+    Config config = jobModel.getConfig();
+    JobConfig jobConfig = new JobConfig(config);
+    if (jobConfig.getName().isEmpty()) {
+      throw new SamzaException("can not find the job name");
+    }
+    String jobName = jobConfig.getName().get();
+    String jobId = jobConfig.getJobId().getOrElse(ScalaJavaUtil.defaultValue("1"));
+    MDC.put("containerName", "samza-container-" + containerId);
+    MDC.put("jobName", jobName);
+    MDC.put("jobId", jobId);
+
+    StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config);
+    Object taskFactory = getTaskFactory(streamApp, config);
+    run(taskFactory, containerId, jobModel, config);
+
+    System.exit(0);
+  }
+
+  private static void run(Object taskFactory, String containerId, JobModel jobModel, Config config) {
+    SamzaContainer container = SamzaContainer$.MODULE$.apply(
         containerId,
         jobModel,
         config,
@@ -98,75 +109,43 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
           }
         });
 
-    startContainerHeartbeatMonitor();
+    ContainerHeartbeatMonitor heartbeatMonitor = createContainerHeartbeatMonitor(container);
+    if (heartbeatMonitor != null) {
+      heartbeatMonitor.start();
+    }
+
     container.run();
-    stopContainerHeartbeatMonitor();
-    
+
+    if (heartbeatMonitor != null) {
+      heartbeatMonitor.stop();
+    }
+
     if (containerRunnerException != null) {
       log.error("Container stopped with Exception. Exiting process now.", containerRunnerException);
       System.exit(1);
     }
   }
 
-  private Object getTaskFactory(StreamApplication streamApp) {
+  private static Object getTaskFactory(StreamApplication streamApp, Config config) {
     if (streamApp != null) {
+      StreamGraphSpec graphSpec = new StreamGraphSpec(config);
       streamApp.init(graphSpec, config);
       return TaskFactoryUtil.createTaskFactory(graphSpec.getOperatorSpecGraph(), graphSpec.getContextManager());
     }
     return TaskFactoryUtil.createTaskFactory(config);
   }
 
-  @Override
-  public void kill(StreamApplication streamApp) {
-    // Ultimately this class probably won't end up extending ApplicationRunner, so this will be deleted
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public ApplicationStatus status(StreamApplication streamApp) {
-    // Ultimately this class probably won't end up extending ApplicationRunner, so this will be deleted
-    throw new UnsupportedOperationException();
-  }
-
-  public static void main(String[] args) throws Exception {
-    Thread.setDefaultUncaughtExceptionHandler(
-        new SamzaUncaughtExceptionHandler(() -> {
-          log.info("Exiting process now.");
-          System.exit(1);
-        }));
-
-    String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
-    log.info(String.format("Got container ID: %s", containerId));
-    System.out.println(String.format("Container ID: %s", containerId));
-    String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
-    log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
-    System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));
-    int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
-    JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
-    Config config = jobModel.getConfig();
-    JobConfig jobConfig = new JobConfig(config);
-    if (jobConfig.getName().isEmpty()) {
-      throw new SamzaException("can not find the job name");
-    }
-    String jobName = jobConfig.getName().get();
-    String jobId = jobConfig.getJobId().getOrElse(ScalaJavaUtil.defaultValue("1"));
-    MDC.put("containerName", "samza-container-" + containerId);
-    MDC.put("jobName", jobName);
-    MDC.put("jobId", jobId);
-
-    StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config);
-    LocalContainerRunner localContainerRunner = new LocalContainerRunner(jobModel, containerId);
-    localContainerRunner.run(streamApp);
-
-    System.exit(0);
-  }
-
-  private void startContainerHeartbeatMonitor() {
+  /**
+   * Creates a new container heartbeat monitor if possible.
+   * @param container the container to monitor
+   * @return a new {@link ContainerHeartbeatMonitor} instance, or null if could not create one
+   */
+  private static ContainerHeartbeatMonitor createContainerHeartbeatMonitor(SamzaContainer container) {
     String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
     String executionEnvContainerId = System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID());
     if (executionEnvContainerId != null) {
       log.info("Got execution environment container id: {}", executionEnvContainerId);
-      containerHeartbeatMonitor = new ContainerHeartbeatMonitor(() -> {
+      return new ContainerHeartbeatMonitor(() -> {
           try {
             container.shutdown();
             containerRunnerException = new SamzaException("Container shutdown due to expired heartbeat");
@@ -175,16 +154,9 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
             System.exit(1);
           }
         }, new ContainerHeartbeatClient(coordinatorUrl, executionEnvContainerId));
-      containerHeartbeatMonitor.start();
     } else {
-      containerHeartbeatMonitor = null;
-      log.warn("executionEnvContainerId not set. Container heartbeat monitor will not be started");
-    }
-  }
-
-  private void stopContainerHeartbeatMonitor() {
-    if (containerHeartbeatMonitor != null) {
-      containerHeartbeatMonitor.stop();
+      log.warn("Execution environment container id not set. Container heartbeat monitor will not be created");
+      return null;
     }
   }
 }