SAMZA-1337: Use StreamTask with the LocalApplicationRunner
authorBoris Shkolnik <boryas@apache.org>
Mon, 26 Jun 2017 19:25:48 +0000 (12:25 -0700)
committernavina <navina@apache.org>
Mon, 26 Jun 2017 19:25:48 +0000 (12:25 -0700)
Author: Boris Shkolnik <boryas@apache.org>

Reviewers: Navina Ramesh <navina@apache.org>, Xinyu Liu <xiliu@apache.org>, Bharath Kumarasubramanian <codin.martial@gmail.com>

Closes #231 from sborya/LocalAppRunnerWithStreamTask

samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java

index 0586e9e..440dd33 100644 (file)
@@ -71,6 +71,17 @@ public abstract class ApplicationRunner {
   }
 
   /**
+   * Deploy and run the Samza jobs to execute {@link org.apache.samza.task.StreamTask}.
+   * It is non-blocking so it doesn't wait for the application running.
+   * This method assumes you task.class is specified in the configs.
+   *
+   * NOTE. this interface will most likely change in the future.
+   */
+  @InterfaceStability.Evolving
+  public abstract void runTask();
+
+
+  /**
    * Deploy and run the Samza jobs to execute {@link StreamApplication}.
    * It is non-blocking so it doesn't wait for the application running.
    *
index b1f0aba..b0bfc8a 100644 (file)
@@ -32,7 +32,9 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.Latch;
 import org.apache.samza.coordinator.LeaderElector;
@@ -54,7 +56,7 @@ import org.slf4j.LoggerFactory;
  */
 public class LocalApplicationRunner extends AbstractApplicationRunner {
 
-  private static final Logger log = LoggerFactory.getLogger(LocalApplicationRunner.class);
+  private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class);
   // Latch id that's used for awaiting the init of application before creating the StreamProcessors
   private static final String INIT_LATCH_ID = "init";
   // Latch timeout is set to 10 min
@@ -134,6 +136,25 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
   }
 
   @Override
+  public void runTask() {
+    JobConfig jobConfig = new JobConfig(this.config);
+
+    // validation
+    String taskName = new TaskConfig(config).getTaskClass().getOrElse(null);
+    if (taskName == null) {
+      throw new SamzaException("Neither APP nor task.class are defined defined");
+    }
+    LOG.info("LocalApplicationRunner will run " + taskName);
+    LocalStreamProcessorLifeCycleListener listener = new LocalStreamProcessorLifeCycleListener();
+
+    StreamProcessor processor = createStreamProcessor(jobConfig, null, listener);
+
+    numProcessorsToStart.set(1);
+    listener.setProcessor(processor);
+    processor.start();
+  }
+
+  @Override
   public void run(StreamApplication app) {
     try {
       // 1. initialize and plan
@@ -148,7 +169,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
         throw new SamzaException("No jobs to run.");
       }
       plan.getJobConfigs().forEach(jobConfig -> {
-          log.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig);
+          LOG.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig);
           LocalStreamProcessorLifeCycleListener listener = new LocalStreamProcessorLifeCycleListener();
           StreamProcessor processor = createStreamProcessor(jobConfig, app, listener);
           listener.setProcessor(processor);
@@ -180,7 +201,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
     try {
       shutdownLatch.await();
     } catch (Exception e) {
-      log.error("Wait is interrupted by exception", e);
+      LOG.error("Wait is interrupted by exception", e);
       throw new SamzaException(e);
     }
   }
index d690c80..5d0e455 100644 (file)
@@ -19,6 +19,8 @@
 
 package org.apache.samza.runtime;
 
+import java.util.HashMap;
+import java.util.Random;
 import org.apache.log4j.MDC;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
@@ -41,9 +43,6 @@ import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Random;
-
 /**
  * LocalContainerRunner is the local runner for Yarn {@link SamzaContainer}s. It is an intermediate step to
  * have a local runner for yarn before we consolidate the Yarn container and coordination into a
@@ -68,6 +67,11 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
   }
 
   @Override
+  public void runTask() {
+    throw new UnsupportedOperationException("Running StreamTask is not implemented for LocalContainerRunner");
+  }
+
+  @Override
   public void run(StreamApplication streamApp) {
     ContainerModel containerModel = jobModel.getContainers().get(containerId);
     Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this);
index 309d8c8..53cd2f6 100644 (file)
@@ -35,12 +35,17 @@ import org.slf4j.LoggerFactory;
  */
 public class RemoteApplicationRunner extends AbstractApplicationRunner {
 
-  private static final Logger log = LoggerFactory.getLogger(RemoteApplicationRunner.class);
+  private static final Logger LOG = LoggerFactory.getLogger(RemoteApplicationRunner.class);
 
   public RemoteApplicationRunner(Config config) {
     super(config);
   }
 
+  @Override
+  public void runTask() {
+    throw new UnsupportedOperationException("Running StreamTask is not implemented for RemoteReplicationRunner");
+  }
+
   /**
    * Run the {@link StreamApplication} on the remote cluster
    * @param app a StreamApplication
@@ -57,7 +62,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
 
       // 3. submit jobs for remote execution
       plan.getJobConfigs().forEach(jobConfig -> {
-          log.info("Starting job {} with config {}", jobConfig.getName(), jobConfig);
+          LOG.info("Starting job {} with config {}", jobConfig.getName(), jobConfig);
           JobRunner runner = new JobRunner(jobConfig);
           runner.run(true);
         });
@@ -72,7 +77,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
       ExecutionPlan plan = getExecutionPlan(app);
 
       plan.getJobConfigs().forEach(jobConfig -> {
-          log.info("Killing job {}", jobConfig.getName());
+          LOG.info("Killing job {}", jobConfig.getName());
           JobRunner runner = new JobRunner(jobConfig);
           runner.kill();
         });
@@ -92,7 +97,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
       for (JobConfig jobConfig : plan.getJobConfigs()) {
         JobRunner runner = new JobRunner(jobConfig);
         ApplicationStatus status = runner.status();
-        log.debug("Status is {} for job {}", new Object[]{status, jobConfig.getName()});
+        LOG.debug("Status is {} for job {}", new Object[]{status, jobConfig.getName()});
 
         switch (status.getStatusCode()) {
           case New:
index a0558ef..cb32252 100644 (file)
@@ -21,6 +21,7 @@ package org.apache.samza.zk;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
@@ -201,7 +202,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     ApplicationConfig appConfig = new ApplicationConfig(config);
     if (appConfig.getProcessorId() != null) {
       return appConfig.getProcessorId();
-    } else if (appConfig.getAppProcessorIdGeneratorClass() != null) {
+    } else if (StringUtils.isNotBlank(appConfig.getAppProcessorIdGeneratorClass())) {
       ProcessorIdGenerator idGenerator =
           ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
       return idGenerator.generateProcessorId(config);
index aaacd6e..ed13b5b 100644 (file)
@@ -368,6 +368,11 @@ public class TestAbstractApplicationRunner {
     }
 
     @Override
+    public void runTask() {
+      throw new UnsupportedOperationException("runTask is not supported in this test");
+    }
+
+    @Override
     public void run(StreamApplication streamApp) {
       // do nothing. We're only testing the stream creation methods at this point.
     }
index 05f3cc2..d22fbae 100644 (file)
@@ -87,6 +87,11 @@ public class TestApplicationRunnerMain {
     }
 
     @Override
+    public void runTask() {
+      throw new UnsupportedOperationException("runTask() not supported in this test");
+    }
+
+    @Override
     public void run(StreamApplication streamApp) {
       runCount++;
     }
index 9d15211..a04bd3b 100644 (file)
@@ -23,6 +23,7 @@ import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.Latch;
 import org.apache.samza.coordinator.LeaderElector;
@@ -205,6 +206,35 @@ public class TestLocalApplicationRunner {
   }
 
   @Test
+  public void testRunStreamTask() throws Exception {
+    final Map<String, String> config = new HashMap<>();
+    config.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
+    config.put(TaskConfig.TASK_CLASS(), "org.apache.samza.test.processor.IdentityStreamTask");
+
+
+    LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(config));
+
+    StreamProcessor sp = mock(StreamProcessor.class);
+    ArgumentCaptor<StreamProcessorLifecycleListener> captor =
+        ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class);
+
+    doAnswer(i ->
+      {
+        StreamProcessorLifecycleListener listener = captor.getValue();
+        listener.onStart();
+        listener.onShutdown();
+        return null;
+      }).when(sp).start();
+
+    LocalApplicationRunner spy = spy(runner);
+    doReturn(sp).when(spy).createStreamProcessor(anyObject(), anyObject(), captor.capture());
+
+    spy.runTask();
+
+    assertEquals(ApplicationStatus.SuccessfulFinish, spy.status(null));
+
+  }
+  @Test
   public void testRunComplete() throws Exception {
     final Map<String, String> config = new HashMap<>();
     config.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());