SAMZA-1137: Instantiate ApplicationRunner in SamzaContainer
authorXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Fri, 24 Mar 2017 20:49:30 +0000 (13:49 -0700)
committerXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Fri, 24 Mar 2017 20:49:30 +0000 (13:49 -0700)
Create an ApplicationRunner in SamzaContainer to provide StreamSpecs for fluent API.

Author: Xinyu Liu <xiliu@xiliu-ld.linkedin.biz>

Reviewers: Jacob Maes <jmakes@apache.org>

Closes #94 from xinyuiscool/SAMZA-1137

samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
samza-core/src/test/java/org/apache/samza/runtime/TestLocalContainerRunner.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java
samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
samza-shell/src/main/bash/run-container.sh
samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java

index 81f9d2b..76e2053 100644 (file)
@@ -105,9 +105,7 @@ public class SamzaContainerController {
         localityManager,
         new JmxServer(),
         Util.<String, MetricsReporter>javaMapAsScalaMap(metricsReporterMap),
-        taskFactory,
-        // TODO: need to use the correct local ApplicationRunner here
-        null);
+        taskFactory);
     log.info("About to start container: " + containerModel.getContainerId());
     containerFuture = executorService.submit(() -> container.run());
   }
index 4d3e8ab..15d9b9d 100644 (file)
@@ -108,17 +108,6 @@ public class StreamProcessor {
     this(processorId, config, customMetricsReporters, (Object) streamTaskFactory);
   }
 
-  /**
-   * Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created
-   * using the "task.class" configuration instead of a task factory.
-   * @param processorId - this processor Id
-   * @param config - config
-   * @param customMetricsReporters metrics
-   */
-  public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters) {
-    this(processorId, config, customMetricsReporters, (Object) null);
-  }
-
   private StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
                           Object taskFactory) {
     this.processorId = processorId;
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
new file mode 100644 (file)
index 0000000..4bfad65
--- /dev/null
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.runtime;
+
+import java.util.HashMap;
+import java.util.Random;
+import org.apache.log4j.MDC;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.container.SamzaContainer$;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.JmxServer;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.task.TaskFactoryUtil;
+import org.apache.samza.util.ScalaToJavaUtils;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * LocalContainerRunner is the local runner for Yarn {@link SamzaContainer}s. It is an intermediate step to
+ * have a local runner for yarn before we consolidate the Yarn container and coordination into a
+ * {@link org.apache.samza.processor.StreamProcessor}. This class will be replaced by the {@link org.apache.samza.processor.StreamProcessor}
+ * local runner once that's done.
+ *
+ * Since we don't have the {@link org.apache.samza.coordinator.JobCoordinator} implementation in Yarn, the components (jobModel and containerId)
+ * are directly inside the runner.
+ */
+public class LocalContainerRunner extends AbstractApplicationRunner {
+  private static final Logger log = LoggerFactory.getLogger(LocalContainerRunner.class);
+  private final JobModel jobModel;
+  private final int containerId;
+
+  public LocalContainerRunner(JobModel jobModel, int containerId) {
+    super(jobModel.getConfig());
+    this.jobModel = jobModel;
+    this.containerId = containerId;
+  }
+
+  @Override
+  public void run(StreamApplication streamApp) {
+    JmxServer jmxServer = null;
+    try {
+      jmxServer = new JmxServer();
+      ContainerModel containerModel = jobModel.getContainers().get(containerId);
+      Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this);
+
+      SamzaContainer container = SamzaContainer$.MODULE$.apply(
+          containerModel.getContainerId(),
+          containerModel,
+          config,
+          jobModel.maxChangeLogStreamPartitions,
+          SamzaContainer.getLocalityManager(containerId, config),
+          jmxServer,
+          Util.javaMapAsScalaMap(new HashMap<String, MetricsReporter>()),
+          taskFactory);
+
+      container.run();
+    } finally {
+      if (jmxServer != null) {
+        jmxServer.stop();
+      }
+    }
+  }
+
+
+  public static void main(String[] args) throws Exception {
+    setExceptionHandler(() -> {
+        log.info("Exiting process now.");
+        System.exit(1);
+      });
+
+    Integer containerId = Integer.valueOf(System.getenv(ShellCommandConfig.ENV_CONTAINER_ID()));
+    log.info(String.format("Got container ID: %d", containerId));
+    String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
+    log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
+    int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
+    JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
+    Config config = jobModel.getConfig();
+    JobConfig jobConfig = new JobConfig(config);
+    if (jobConfig.getName().isEmpty()) {
+      throw new SamzaException("can not find the job name");
+    }
+    String jobName = jobConfig.getName().get();
+    String jobId = jobConfig.getJobId().getOrElse(ScalaToJavaUtils.defaultValue("1"));
+    MDC.put("containerName", "samza-container-" + containerId);
+    MDC.put("jobName", jobName);
+    MDC.put("jobId", jobId);
+
+    StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config);
+    new LocalContainerRunner(jobModel, containerId).run(streamApp);
+  }
+
+  /* package private */ static void setExceptionHandler(Runnable runnable) {
+    Thread.UncaughtExceptionHandler exceptionHandler = (t, e) -> {
+      log.error(String.format("Uncaught exception in thread (name=%s).", t.getName(), e));
+      e.printStackTrace(System.err);
+      runnable.run();
+    };
+    Thread.setDefaultUncaughtExceptionHandler(exceptionHandler);
+  }
+}
index e79d278..f8d1c87 100644 (file)
@@ -39,28 +39,35 @@ public class TaskFactoryUtil {
   private static final Logger log = LoggerFactory.getLogger(TaskFactoryUtil.class);
 
   /**
-   * This method loads a task factory class based on the configuration
+   * This method creates a task factory class based on the configuration and {@link StreamApplication}
    *
    * @param config  the {@link Config} for this job
+   * @param streamApp the {@link StreamApplication}
    * @param runner  the {@link ApplicationRunner} to run this job
    * @return  a task factory object, either a instance of {@link StreamTaskFactory} or {@link AsyncStreamTaskFactory}
    */
-  public static Object fromTaskClassConfig(Config config, ApplicationRunner runner) {
+  public static Object createTaskFactory(Config config, StreamApplication streamApp, ApplicationRunner runner) {
+    return (streamApp != null) ? createStreamOperatorTaskFactory(streamApp, runner) : fromTaskClassConfig(config);
+  }
 
-    String taskClassName;
+  private static StreamTaskFactory createStreamOperatorTaskFactory(StreamApplication streamApp, ApplicationRunner runner) {
+    return () -> new StreamOperatorTask(streamApp, runner);
+  }
 
+  /**
+   * Create {@link StreamTaskFactory} or {@link AsyncStreamTaskFactory} based on the configured task.class.
+   * @param config the {@link Config}
+   * @return task factory instance
+   */
+  private static Object fromTaskClassConfig(Config config) {
     // if there is configuration to set the job w/ a specific type of task, instantiate the corresponding task factory
-    if (isStreamOperatorTask(config)) {
-      taskClassName = StreamOperatorTask.class.getName();
-    } else {
-      taskClassName = new TaskConfig(config).getTaskClass().getOrElse(
-          new AbstractFunction0<String>() {
-            @Override
-            public String apply() {
-              throw new ConfigException("There is no task class defined in the configuration. Failed to create a valid TaskFactory");
-            }
-          });
-    }
+    String taskClassName = new TaskConfig(config).getTaskClass().getOrElse(
+      new AbstractFunction0<String>() {
+        @Override
+        public String apply() {
+          throw new ConfigException("There is no task class defined in the configuration. Failed to create a valid TaskFactory");
+        }
+      });
 
     log.info("Got task class name: {}", taskClassName);
 
@@ -89,9 +96,6 @@ public class TaskFactoryUtil {
       @Override
       public StreamTask createInstance() {
         try {
-          if (taskClassName.equals(StreamOperatorTask.class.getName())) {
-            return createStreamOperatorTask(config, runner);
-          }
           return (StreamTask) Class.forName(taskClassName).newInstance();
         } catch (Throwable t) {
           log.error("Error loading StreamTask class: {}. error: {}", taskClassName, t);
@@ -147,29 +151,28 @@ public class TaskFactoryUtil {
     }
   }
 
-  private static StreamTask createStreamOperatorTask(Config config, ApplicationRunner runner) throws Exception {
-    Class<?> builderClass = Class.forName(config.get(StreamApplication.APP_CLASS_CONFIG));
-    StreamApplication graphBuilder = (StreamApplication) builderClass.newInstance();
-    return new StreamOperatorTask(graphBuilder, runner);
-  }
-
-  private static boolean isStreamOperatorTask(Config config) {
+  /**
+   * Returns {@link StreamApplication} if it's configured, otherwise null.
+   * @param config Config
+   * throws {@link ConfigException} if there is misconfiguration of StreamApp.
+   * @return {@link StreamApplication} instance
+   */
+  public static StreamApplication createStreamApplication(Config config) {
     if (config.get(StreamApplication.APP_CLASS_CONFIG) != null && !config.get(StreamApplication.APP_CLASS_CONFIG).isEmpty()) {
-
       TaskConfig taskConfig = new TaskConfig(config);
       if (taskConfig.getTaskClass() != null && !taskConfig.getTaskClass().isEmpty()) {
         throw new ConfigException("High level StreamApplication API cannot be used together with low-level API using task.class.");
       }
 
+      String appClassName = config.get(StreamApplication.APP_CLASS_CONFIG);
       try {
-        Class<?> builderClass = Class.forName(config.get(StreamApplication.APP_CLASS_CONFIG));
-        return StreamApplication.class.isAssignableFrom(builderClass);
+        Class<?> builderClass = Class.forName(appClassName);
+        return (StreamApplication) builderClass.newInstance();
       } catch (Throwable t) {
-        log.error("Failed to validate StreamApplication class from the config. {}={}",
-            StreamApplication.APP_CLASS_CONFIG, config.get(StreamApplication.APP_CLASS_CONFIG));
-        return false;
+        throw new ConfigException(String.format("%s is not a StreamApplication.", appClassName));
       }
+    } else {
+      return null;
     }
-    return false;
   }
 }
index 77959dd..e43ddfe 100644 (file)
@@ -23,7 +23,6 @@ import java.io.File
 import java.nio.file.Path
 import java.util
 import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, TimeUnit}
-import java.lang.Thread.UncaughtExceptionHandler
 import java.net.{URL, UnknownHostException}
 
 import org.apache.samza.SamzaException
@@ -82,49 +81,6 @@ object SamzaContainer extends Logging {
   val DEFAULT_READ_JOBMODEL_DELAY_MS = 100
   val DISK_POLL_INTERVAL_KEY = "container.disk.poll.interval.ms"
 
-  def main(args: Array[String]) {
-    safeMain(() => new JmxServer, new SamzaContainerExceptionHandler(() => System.exit(1)))
-  }
-
-  def safeMain(
-    newJmxServer: () => JmxServer,
-    exceptionHandler: UncaughtExceptionHandler = null) {
-    if (exceptionHandler != null) {
-      Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
-    }
-    putMDC("containerName", "samza-container-" + System.getenv(ShellCommandConfig.ENV_CONTAINER_ID))
-    // Break out the main method to make the JmxServer injectable so we can
-    // validate that we don't leak JMX non-daemon threads if we have an
-    // exception in the main method.
-    val containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID).toInt
-    logger.info("Got container ID: %s" format containerId)
-    val coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL)
-    logger.info("Got coordinator URL: %s" format coordinatorUrl)
-    val jobModel = readJobModel(coordinatorUrl)
-    val containerModel = jobModel.getContainers()(containerId.toInt)
-    val config = jobModel.getConfig
-    putMDC("jobName", config.getName.getOrElse(throw new SamzaException("can not find the job name")))
-    putMDC("jobId", config.getJobId.getOrElse("1"))
-    var jmxServer: JmxServer = null
-
-    try {
-      jmxServer = newJmxServer()
-      val containerModel = jobModel.getContainers.get(containerId.toInt)
-      // TODO: add actual local runner in a container to the parameters
-      SamzaContainer(
-        containerId.toInt,
-        containerModel,
-        config,
-        jobModel.maxChangeLogStreamPartitions,
-        getLocalityManager(containerId, config),
-        jmxServer).run
-    } finally {
-      if (jmxServer != null) {
-        jmxServer.stop
-      }
-    }
-  }
-
   def getLocalityManager(containerId: Int, config: Config): LocalityManager = {
     val containerName = getSamzaContainerName(containerId)
     val registryMap = new MetricsRegistryMap(containerName)
@@ -164,9 +120,7 @@ object SamzaContainer extends Logging {
     localityManager: LocalityManager,
     jmxServer: JmxServer,
     customReporters: Map[String, MetricsReporter] = Map[String, MetricsReporter](),
-    taskFactory: Object = null,
-    // SAMZA-1137: need to instantiate the ApplicationRunner in the container local JVM and pass it in
-    appRunner: ApplicationRunner = null) = {
+    taskFactory: Object) = {
     val containerName = getSamzaContainerName(containerId)
     val containerPID = Util.getContainerPID
 
@@ -438,11 +392,8 @@ object SamzaContainer extends Logging {
     else
       null
 
-    val taskFactoryInstance = Option(taskFactory)
-      .getOrElse(TaskFactoryUtil.fromTaskClassConfig(config, appRunner))
-
     val finalTaskFactory = TaskFactoryUtil.finalizeTaskFactory(
-      taskFactoryInstance,
+      taskFactory,
       singleThreadMode,
       taskThreadPool)
 
index 164e319..f218543 100644 (file)
 package org.apache.samza.job.local
 
 
+import org.apache.samza.metrics.MetricsReporter
 import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap}
+import org.apache.samza.runtime.LocalContainerRunner
+import org.apache.samza.task.TaskFactoryUtil
 import org.apache.samza.util.Logging
 import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
@@ -41,6 +44,9 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
     val jobModel = coordinator.jobModel
     val containerModel = jobModel.getContainers.get(0)
     val jmxServer = new JmxServer
+    val streamApp = TaskFactoryUtil.createStreamApplication(config)
+    val appRunner = new LocalContainerRunner(jobModel, 0)
+    val taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, appRunner)
 
     // Give developers a nice friendly warning if they've specified task.opts and are using a threaded job.
     config.getTaskOpts match {
@@ -57,7 +63,9 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
               config,
               jobModel.maxChangeLogStreamPartitions,
               null,
-              jmxServer))
+              jmxServer,
+              Map[String, MetricsReporter](),
+              taskFactory))
     } finally {
       coordinator.stop
       jmxServer.stop
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalContainerRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalContainerRunner.java
new file mode 100644 (file)
index 0000000..f9465e3
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.runtime;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestLocalContainerRunner {
+  private boolean caughtException = false;
+
+  @Test
+  public void testUncaughtExceptionHandler() throws Exception {
+    Runnable runnable = () -> { caughtException = true; };
+    LocalContainerRunner.setExceptionHandler(runnable);
+
+    try {
+      ((String) null).length();
+    } catch (Exception e) {
+      // catch null pointer exception
+    }
+    assertFalse(caughtException);
+
+    Thread t = new Thread(() -> {
+        throw new RuntimeException("Uncaught exception in another thread. Catch this.");
+      });
+    t.start();
+    t.join();
+    assertTrue(caughtException);
+  }
+}
\ No newline at end of file
index 27168f3..0b051e8 100644 (file)
@@ -33,6 +33,8 @@ import java.util.HashMap;
 import java.util.concurrent.ExecutorService;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -51,7 +53,7 @@ public class TestTaskFactoryUtil {
         this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
       }
     });
-    Object retFactory = TaskFactoryUtil.fromTaskClassConfig(config, mockRunner);
+    Object retFactory = TaskFactoryUtil.createTaskFactory(config, null, null);
     assertTrue(retFactory instanceof StreamTaskFactory);
     assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask);
 
@@ -61,7 +63,7 @@ public class TestTaskFactoryUtil {
       }
     });
     try {
-      TaskFactoryUtil.fromTaskClassConfig(config, mockRunner);
+      TaskFactoryUtil.createTaskFactory(config, null, null);
       fail("Should have failed w/ no.such.class");
     } catch (ConfigException cfe) {
       // expected
@@ -69,13 +71,15 @@ public class TestTaskFactoryUtil {
   }
 
   @Test
-  public void testStreamOperatorTaskClass() {
+  public void testCreateStreamApplication() throws Exception {
     Config config = new MapConfig(new HashMap<String, String>() {
       {
         this.put(StreamApplication.APP_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamApplication");
       }
     });
-    Object retFactory = TaskFactoryUtil.fromTaskClassConfig(config, mockRunner);
+    StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config);
+    assertNotNull(streamApp);
+    Object retFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, mockRunner);
     assertTrue(retFactory instanceof StreamTaskFactory);
     assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask);
 
@@ -85,7 +89,7 @@ public class TestTaskFactoryUtil {
       }
     });
     try {
-      TaskFactoryUtil.fromTaskClassConfig(config, mockRunner);
+      TaskFactoryUtil.createStreamApplication(config);
       fail("Should have failed w/ no.such.class");
     } catch (ConfigException ce) {
       // expected
@@ -97,7 +101,7 @@ public class TestTaskFactoryUtil {
       }
     });
     try {
-      TaskFactoryUtil.fromTaskClassConfig(config, mockRunner);
+      TaskFactoryUtil.createStreamApplication(config);
       fail("Should have failed w/ no.such.class");
     } catch (ConfigException ce) {
       // expected
@@ -108,32 +112,23 @@ public class TestTaskFactoryUtil {
         this.put(StreamApplication.APP_CLASS_CONFIG, "");
       }
     });
-    try {
-      TaskFactoryUtil.fromTaskClassConfig(config, mockRunner);
-      fail("Should have failed w/ empty class name for StreamApplication");
-    } catch (ConfigException ce) {
-      // expected
-    }
+    streamApp = TaskFactoryUtil.createStreamApplication(config);
+    assertNull(streamApp);
 
     config = new MapConfig(new HashMap<>());
-    try {
-      TaskFactoryUtil.fromTaskClassConfig(config, mockRunner);
-      fail(String.format("Should have failed w/ non-existing entry for %s", StreamApplication.APP_CLASS_CONFIG));
-    } catch (ConfigException ce) {
-      // expected
-    }
+    streamApp = TaskFactoryUtil.createStreamApplication(config);
+    assertNull(streamApp);
   }
 
   @Test
-  public void testStreamOperatorTaskClassWithTaskClass() {
+  public void testCreateStreamApplicationWithTaskClass() throws Exception {
     Config config = new MapConfig(new HashMap<String, String>() {
       {
         this.put(StreamApplication.APP_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamApplication");
       }
     });
-    Object retFactory = TaskFactoryUtil.fromTaskClassConfig(config, mockRunner);
-    assertTrue(retFactory instanceof StreamTaskFactory);
-    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask);
+    StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config);
+    assertNotNull(streamApp);
 
     config = new MapConfig(new HashMap<String, String>() {
       {
@@ -142,7 +137,7 @@ public class TestTaskFactoryUtil {
       }
     });
     try {
-      TaskFactoryUtil.fromTaskClassConfig(config, mockRunner);
+      TaskFactoryUtil.createStreamApplication(config);
       fail("should have failed with invalid config");
     } catch (ConfigException ce) {
       // expected
@@ -155,7 +150,7 @@ public class TestTaskFactoryUtil {
       }
     });
     try {
-      TaskFactoryUtil.fromTaskClassConfig(config, mockRunner);
+      TaskFactoryUtil.createStreamApplication(config);
       fail("should have failed with invalid config");
     } catch (ConfigException ce) {
       // expected
@@ -163,7 +158,7 @@ public class TestTaskFactoryUtil {
   }
 
   @Test
-  public void testStreamTaskClassWithInvalidStreamGraphBuilder() {
+  public void testStreamTaskClassWithInvalidStreamApplication() throws Exception {
 
     Config config = new MapConfig(new HashMap<String, String>() {
       {
@@ -171,8 +166,8 @@ public class TestTaskFactoryUtil {
       }
     });
     try {
-      TaskFactoryUtil.fromTaskClassConfig(config, mockRunner);
-      fail("should have failed with invalid config");
+      TaskFactoryUtil.createStreamApplication(config);
+      fail("Should have failed w/ no.such.class");
     } catch (ConfigException ce) {
       // expected
     }
@@ -183,7 +178,8 @@ public class TestTaskFactoryUtil {
         this.put(StreamApplication.APP_CLASS_CONFIG, "");
       }
     });
-    Object retFactory = TaskFactoryUtil.fromTaskClassConfig(config, mockRunner);
+    StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config);
+    Object retFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, mockRunner);
     assertTrue(retFactory instanceof StreamTaskFactory);
     assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask);
 
@@ -194,7 +190,7 @@ public class TestTaskFactoryUtil {
       }
     });
     try {
-      TaskFactoryUtil.fromTaskClassConfig(config, mockRunner);
+      TaskFactoryUtil.createStreamApplication(config);
       fail("Should have failed w/ no class not found");
     } catch (ConfigException cne) {
       // expected
@@ -208,7 +204,7 @@ public class TestTaskFactoryUtil {
         this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
       }
     });
-    Object retFactory = TaskFactoryUtil.fromTaskClassConfig(config, mockRunner);
+    Object retFactory = TaskFactoryUtil.createTaskFactory(config, null, null);
     assertTrue(retFactory instanceof AsyncStreamTaskFactory);
     assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask);
 
@@ -218,7 +214,7 @@ public class TestTaskFactoryUtil {
       }
     });
     try {
-      TaskFactoryUtil.fromTaskClassConfig(config, mockRunner);
+      TaskFactoryUtil.createTaskFactory(config, null, null);
       fail("Should have failed w/ no.such.class");
     } catch (ConfigException cfe) {
       // expected
@@ -226,7 +222,7 @@ public class TestTaskFactoryUtil {
   }
 
   @Test
-  public void testAsyncStreamTaskWithInvalidStreamGraphBuilder() throws ClassNotFoundException {
+  public void testAsyncStreamTaskWithInvalidStreamGraphBuilder() throws Exception {
 
     Config config = new MapConfig(new HashMap<String, String>() {
       {
@@ -234,7 +230,7 @@ public class TestTaskFactoryUtil {
       }
     });
     try {
-      TaskFactoryUtil.fromTaskClassConfig(config, mockRunner);
+      TaskFactoryUtil.createStreamApplication(config);
       fail("Should have failed w/ no.such.class");
     } catch (ConfigException cfe) {
       // expected
@@ -246,7 +242,8 @@ public class TestTaskFactoryUtil {
         this.put(StreamApplication.APP_CLASS_CONFIG, "");
       }
     });
-    Object retFactory = TaskFactoryUtil.fromTaskClassConfig(config, mockRunner);
+    StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config);
+    Object retFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, mockRunner);
     assertTrue(retFactory instanceof AsyncStreamTaskFactory);
     assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask);
 
@@ -256,7 +253,8 @@ public class TestTaskFactoryUtil {
         this.put(StreamApplication.APP_CLASS_CONFIG, null);
       }
     });
-    retFactory = TaskFactoryUtil.fromTaskClassConfig(config, mockRunner);
+    streamApp = TaskFactoryUtil.createStreamApplication(config);
+    retFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, mockRunner);
     assertTrue(retFactory instanceof AsyncStreamTaskFactory);
     assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask);
   }
index cf93928..a72a59a 100644 (file)
@@ -214,30 +214,6 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
   }
 
   @Test
-  def testUncaughtExceptionHandler {
-    var caughtException = false
-    val exceptionHandler = new UncaughtExceptionHandler {
-      def uncaughtException(t: Thread, e: Throwable) {
-        caughtException = true
-      }
-    }
-    try {
-      SamzaContainer.safeMain(() => null, exceptionHandler)
-    } catch {
-      case _: Exception =>
-      // Expect some random exception from SamzaContainer because we haven't
-      // set any environment variables for container ID, etc.
-    }
-    assertFalse(caughtException)
-    val t = new Thread(new Runnable {
-      def run = throw new RuntimeException("Uncaught exception in another thread. Catch this.")
-    })
-    t.start
-    t.join
-    assertTrue(caughtException)
-  }
-
-  @Test
   def testErrorInTaskInitShutsDownTask {
     val task = new StreamTask with InitableTask with ClosableTask {
       var wasShutdown = false
index b75e668..bc10338 100755 (executable)
@@ -25,4 +25,4 @@
 # Set container name system property for use in Log4J
 [[ $JAVA_OPTS != *-Dsamza.container.name* && ! -z "$SAMZA_CONTAINER_ID" ]] && export JAVA_OPTS="$JAVA_OPTS -Dsamza.container.name=samza-container-$SAMZA_CONTAINER_ID"
 
-exec $(dirname $0)/run-class.sh org.apache.samza.container.SamzaContainer "$@"
\ No newline at end of file
+exec $(dirname $0)/run-class.sh org.apache.samza.runtime.LocalContainerRunner "$@"
\ No newline at end of file
index f58bd8f..a1ad363 100644 (file)
@@ -69,7 +69,7 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
     // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a
     // TopicExistsException since StreamProcessor auto-creates them.
     createTopics(inputTopic, outputTopic);
-    final StreamProcessor processor = new StreamProcessor(1, new MapConfig(configs), new HashMap<>());
+    final StreamProcessor processor = new StreamProcessor(1, new MapConfig(configs), new HashMap<>(), IdentityStreamTask::new);
 
     produceMessages(inputTopic, messageCount);
     run(processor, endLatch);
@@ -132,7 +132,7 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
     configMap.remove("task.class");
     final Config configs = new MapConfig(configMap);
 
-    StreamProcessor processor = new StreamProcessor(1, configs, new HashMap<>());
+    StreamProcessor processor = new StreamProcessor(1, configs, new HashMap<>(), (StreamTaskFactory) null);
     run(processor, endLatch);
   }