SAMZA-1222: Clean up LocalApplicationRunner
authorXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Thu, 4 May 2017 22:17:56 +0000 (15:17 -0700)
committerXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Thu, 4 May 2017 22:17:56 +0000 (15:17 -0700)
Clean up the LocalApplicationRunner based on the further feedback. The changes include the following:
1. Remove the processorId from the JobCoordinatorFactory/JobCoordinator interfaces
2. LocalApplicationRunner.run() is non-blocking. Add LocalApplicationRunner.waitForFinish() for blocking for completion
3. Remove the config for CooridnatorServiceFactory, and now the CoordinatorService is created based on the type of JobCoordinator.
4. Clean up the StreamProcessor life cycle listener logic inside LocalApplicationRunner.

Author: Xinyu Liu <xiliu@xiliu-ld.linkedin.biz>

Reviewers: Navina Ramesh <navina@apache.org>

Closes #135 from xinyuiscool/SAMZA-1222

15 files changed:
build.gradle
samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java

index 74e5161..c1a261b 100644 (file)
@@ -637,6 +637,7 @@ project(":samza-test_$scalaVersion") {
     testCompile "org.apache.kafka:kafka-clients:$kafkaVersion:test"
     testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
     testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
+    testCompile "org.mockito:mockito-all:$mockitoVersion"
     testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
   }
 
index 8fb991a..bf2c643 100644 (file)
@@ -81,7 +81,8 @@ public abstract class ApplicationRunner {
   }
 
   /**
-   * Deploy and run the Samza jobs to execute {@link StreamApplication}
+   * Deploy and run the Samza jobs to execute {@link StreamApplication}.
+   * It is non-blocking so it doesn't wait for the application running.
    *
    * @param streamApp  the user-defined {@link StreamApplication} object
    */
@@ -89,6 +90,7 @@ public abstract class ApplicationRunner {
 
   /**
    * Kill the Samza jobs represented by {@link StreamApplication}
+   * It is non-blocking so it doesn't wait for the application stopping.
    *
    * @param streamApp  the user-defined {@link StreamApplication} object
    */
index ea0f999..1434231 100644 (file)
@@ -43,7 +43,6 @@ public class ApplicationConfig extends MapConfig {
    * environment. Hence, durability of the identifier is same as the guarantees provided by the runtime environment
    */
   public static final String APP_PROCESSOR_ID_GENERATOR_CLASS = "app.processor-id-generator.class";
-  public static final String APP_COORDINATION_SERVICE_FACTORY_CLASS = "app.coordination.service.factory.class";
   public static final String APP_NAME = "app.name";
   public static final String APP_ID = "app.id";
   public static final String APP_CLASS = "app.class";
@@ -56,10 +55,6 @@ public class ApplicationConfig extends MapConfig {
     return get(APP_PROCESSOR_ID_GENERATOR_CLASS, null);
   }
 
-  public String getCoordinationServiceFactoryClass() {
-    return get(APP_COORDINATION_SERVICE_FACTORY_CLASS);
-  }
-
   public String getAppName() {
     return get(APP_NAME, get(JobConfig.JOB_NAME()));
   }
index c03df06..57632ca 100644 (file)
@@ -23,7 +23,6 @@ import com.google.common.base.Strings;
 
 public class JobCoordinatorConfig extends MapConfig {
   public static final String JOB_COORDINATOR_FACTORY = "job.coordinator.factory";
-  public static final String JOB_COORDINATIOIN_SERVICE_FACTORY = "job.coordinationService.factory";
 
   public JobCoordinatorConfig(Config config) {
     super(config);
@@ -38,14 +37,4 @@ public class JobCoordinatorConfig extends MapConfig {
 
     return jobCoordinatorFactoryClassName;
   }
-
-  public String getJobCoordinationServiceFactoryClassName() {
-    String jobCooridanationFactoryClassName = get(JOB_COORDINATIOIN_SERVICE_FACTORY, "org.apache.samza.zk.ZkCoordinationServiceFactory");
-    if (Strings.isNullOrEmpty(jobCooridanationFactoryClassName)) {
-      throw new ConfigException(
-          String.format("config  '%s' is set to empty. Cannot instantiate coordination utils!", JOB_COORDINATOR_FACTORY));
-    }
-
-    return jobCooridanationFactoryClassName;
-  }
 }
index 784d48d..83ebf52 100644 (file)
@@ -24,9 +24,9 @@ import org.apache.samza.config.Config;
 @InterfaceStability.Evolving
 public interface JobCoordinatorFactory {
   /**
-   * @param processorId Identifier for {@link org.apache.samza.processor.StreamProcessor} instance
+   * Return a new instance of {@link JobCoordinator}
    * @param config Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
-   * @return An instance of IJobCoordinator
+   * @return {@link JobCoordinator} instance
    */
-  JobCoordinator getJobCoordinator(String processorId, Config config);
+  JobCoordinator getJobCoordinator(Config config);
 }
\ No newline at end of file
index 6329f6c..2ee76dc 100644 (file)
@@ -87,37 +87,36 @@ public class StreamProcessor {
    * <p>
    * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the StreamProcessor, and NOT exposed to the user
    *
-   * @param processorId            String identifier for this processor
    * @param config                 Instance of config object - contains all configuration required for processing
    * @param customMetricsReporters Map of custom MetricReporter instances that are to be injected in the Samza job
    * @param asyncStreamTaskFactory The {@link AsyncStreamTaskFactory} to be used for creating task instances.
    * @param processorListener         listener to the StreamProcessor life cycle
    */
-  public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+  public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
                          AsyncStreamTaskFactory asyncStreamTaskFactory, StreamProcessorLifecycleListener processorListener) {
-    this(processorId, config, customMetricsReporters, (Object) asyncStreamTaskFactory, processorListener);
+    this(config, customMetricsReporters, (Object) asyncStreamTaskFactory, processorListener);
   }
 
   /**
-   *Same as {@link #StreamProcessor(String, Config, Map, AsyncStreamTaskFactory, StreamProcessorLifecycleListener)}, except task
+   *Same as {@link #StreamProcessor(Config, Map, AsyncStreamTaskFactory, StreamProcessorLifecycleListener)}, except task
    * instances are created using the provided {@link StreamTaskFactory}.
    * @param config - config
    * @param customMetricsReporters metric Reporter
    * @param streamTaskFactory task factory to instantiate the Task
    * @param processorListener  listener to the StreamProcessor life cycle
    */
-  public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+  public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
                          StreamTaskFactory streamTaskFactory, StreamProcessorLifecycleListener processorListener) {
-    this(processorId, config, customMetricsReporters, (Object) streamTaskFactory, processorListener);
+    this(config, customMetricsReporters, (Object) streamTaskFactory, processorListener);
   }
 
   /* package private */
-  JobCoordinator getJobCoordinator(String processorId) {
+  JobCoordinator getJobCoordinator() {
     return Util.
         <JobCoordinatorFactory>getObj(
             new JobCoordinatorConfig(config)
                 .getJobCoordinatorFactoryClassName())
-        .getJobCoordinator(processorId, config);
+        .getJobCoordinator(config);
   }
 
   @VisibleForTesting
@@ -133,14 +132,14 @@ public class StreamProcessor {
     this.jobCoordinator.setListener(jobCoordinatorListener);
   }
 
-  private StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+  private StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
                           Object taskFactory, StreamProcessorLifecycleListener processorListener) {
     this.taskFactory = taskFactory;
     this.config = config;
     this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
     this.customMetricsReporter = customMetricsReporters;
     this.processorListener = processorListener;
-    this.jobCoordinator = getJobCoordinator(processorId);
+    this.jobCoordinator = getJobCoordinator();
     this.jobCoordinator.setListener(createJobCoordinatorListener());
   }
 
index 6b8e3c7..7a4da7d 100644 (file)
@@ -24,9 +24,7 @@ import org.apache.samza.annotation.InterfaceStability;
 
 /**
  * This class listens to the life cycle events in a {@link StreamProcessor},
- * and triggers the corresponding callbacks.
  */
-
 @InterfaceStability.Evolving
 public interface StreamProcessorLifecycleListener {
   /**
index ca4652e..9fed202 100644 (file)
@@ -1,4 +1,4 @@
-  /*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
 
 package org.apache.samza.runtime;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigException;
-import org.apache.samza.coordinator.CoordinationServiceFactory;
+import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.Latch;
 import org.apache.samza.execution.ExecutionPlan;
@@ -44,8 +43,8 @@ import org.apache.samza.system.StreamSpec;
 import org.apache.samza.task.AsyncStreamTaskFactory;
 import org.apache.samza.task.StreamTaskFactory;
 import org.apache.samza.task.TaskFactoryUtil;
-import org.apache.samza.util.ClassLoaderHelper;
-import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.apache.samza.zk.ZkCoordinationServiceFactory;
+import org.apache.samza.zk.ZkJobCoordinatorFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,53 +55,82 @@ import org.slf4j.LoggerFactory;
 public class LocalApplicationRunner extends AbstractApplicationRunner {
 
   private static final Logger log = LoggerFactory.getLogger(LocalApplicationRunner.class);
-  private static final String LATCH_INIT = "init";
-  private static final long LATCH_TIMEOUT_MINUTES = 10; // 10 min timeout
+  // Latch id that's used for awaiting the init of application before creating the StreamProcessors
+  private static final String INIT_LATCH_ID = "init";
+  // Latch timeout is set to 10 min
+  private static final long LATCH_TIMEOUT_MINUTES = 10;
 
   private final String uid;
-  private final CoordinationUtils coordination;
-  private final List<StreamProcessor> processors = new ArrayList<>();
-  private final CountDownLatch latch = new CountDownLatch(1);
-  private final AtomicReference<Throwable> throwable = new AtomicReference<>();
-  private final ConcurrentHashSet<String> processorIds = new ConcurrentHashSet<>();
+  private final CoordinationUtils coordinationUtils;
+  private final Set<StreamProcessor> processors = ConcurrentHashMap.newKeySet();
+  private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+  private final AtomicInteger numProcessorsToStart = new AtomicInteger();
+  private final AtomicReference<Throwable> failure = new AtomicReference<>();
 
   private ApplicationStatus appStatus = ApplicationStatus.New;
 
-  final class LocalStreamProcessorListener implements StreamProcessorLifecycleListener {
-    public final String processorId;
+  final class LocalStreamProcessorLifeCycleListener implements StreamProcessorLifecycleListener {
+    StreamProcessor processor;
 
-    public LocalStreamProcessorListener(String processorId) {
-      if (StringUtils.isEmpty(processorId)) {
-        throw new NullPointerException("processorId has to be defined in LocalStreamProcessorListener.");
-      }
-      this.processorId = processorId;
+    void setProcessor(StreamProcessor processor) {
+      this.processor = processor;
     }
 
     @Override
     public void onStart() {
-      processorIds.add(processorId);
+      if (numProcessorsToStart.decrementAndGet() == 0) {
+        appStatus = ApplicationStatus.Running;
+      }
     }
 
     @Override
     public void onShutdown() {
-      processorIds.remove(processorId);
-      if (processorIds.isEmpty()) {
-        latch.countDown();
+      processors.remove(processor);
+      processor = null;
+
+      if (processors.isEmpty()) {
+        shutdownAndNotify();
       }
     }
 
     @Override
     public void onFailure(Throwable t) {
-      processorIds.remove(processorId);
-      throwable.compareAndSet(null, t);
-      latch.countDown();
+      processors.remove(processor);
+      processor = null;
+
+      if (failure.compareAndSet(null, t)) {
+        // shutdown the other processors
+        processors.forEach(StreamProcessor::stop);
+      }
+
+      if (processors.isEmpty()) {
+        shutdownAndNotify();
+      }
+    }
+
+    private void shutdownAndNotify() {
+      if (failure.get() != null) {
+        appStatus = ApplicationStatus.unsuccessfulFinish(failure.get());
+      } else {
+        if (appStatus == ApplicationStatus.Running) {
+          appStatus = ApplicationStatus.SuccessfulFinish;
+        } else if (appStatus == ApplicationStatus.New) {
+          // the processor is shutdown before started
+          appStatus = ApplicationStatus.UnsuccessfulFinish;
+        }
+      }
+
+      if (coordinationUtils != null) {
+        coordinationUtils.reset();
+      }
+      shutdownLatch.countDown();
     }
   }
 
-  public LocalApplicationRunner(Config config) throws Exception {
+  public LocalApplicationRunner(Config config) {
     super(config);
     uid = UUID.randomUUID().toString();
-    coordination = getCoordinationUtils();
+    coordinationUtils = createCoordinationUtils();
   }
 
   @Override
@@ -115,30 +143,23 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
       // 2. create the necessary streams
       createStreams(plan.getIntermediateStreams());
 
-      // 3. start the StreamProcessors
+      // 3. create the StreamProcessors
       if (plan.getJobConfigs().isEmpty()) {
         throw new SamzaException("No jobs to run.");
       }
       plan.getJobConfigs().forEach(jobConfig -> {
-          log.info("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig);
-          String processorId = getProcessorId(config);
-          StreamProcessor processor =
-              createStreamProcessor(processorId, jobConfig, app, new LocalStreamProcessorListener(processorId));
-          processor.start();
+          log.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig);
+          LocalStreamProcessorLifeCycleListener listener = new LocalStreamProcessorLifeCycleListener();
+          StreamProcessor processor = createStreamProcessor(jobConfig, app, listener);
+          listener.setProcessor(processor);
           processors.add(processor);
         });
-      appStatus = ApplicationStatus.Running;
-
-      // 4. block until the processors are done or there is a failure
-      awaitComplete();
+      numProcessorsToStart.set(processors.size());
 
-    } catch (Throwable t) {
-      appStatus = ApplicationStatus.unsuccessfulFinish(t);
-      throw new SamzaException("Failed to run application", t);
-    } finally {
-      if (coordination != null) {
-        coordination.reset();
-      }
+      // 4. start the StreamProcessors
+      processors.forEach(StreamProcessor::start);
+    } catch (Exception e) {
+      throw new SamzaException("Failed to start application", e);
     }
   }
 
@@ -153,31 +174,46 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
   }
 
   /**
-   * Create the Coordination needed by the application runner.
-   * @return {@link CoordinationUtils}
-   * @throws Exception exception
+   * Block until the application finishes
    */
-  /* package private */ CoordinationUtils getCoordinationUtils() throws Exception {
-    ApplicationConfig appConfig = new ApplicationConfig(config);
-    String clazz = appConfig.getCoordinationServiceFactoryClass();
-    if (clazz != null) {
-      CoordinationServiceFactory factory = ClassLoaderHelper.fromClassName(clazz);
-      return factory.getCoordinationService(appConfig.getGlobalAppId(), uid, config);
+  public void waitForFinish() {
+    try {
+      shutdownLatch.await();
+    } catch (Exception e) {
+      log.error("Wait is interrupted by exception", e);
+      throw new SamzaException(e);
+    }
+  }
+
+  /**
+   * Create the {@link CoordinationUtils} needed by the application runner, or null if it's not configured.
+   * @return an instance of {@link CoordinationUtils}
+   */
+  /* package private */ CoordinationUtils createCoordinationUtils() {
+    String jobCoordinatorFactoryClassName = config.get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "");
+
+    // TODO: we will need a better way to package the configs with application runner
+    if (ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) {
+      ApplicationConfig appConfig = new ApplicationConfig(config);
+      return new ZkCoordinationServiceFactory().getCoordinationService(appConfig.getGlobalAppId(), uid, config);
     } else {
       return null;
     }
   }
 
   /**
-   * Create intermediate streams.
-   * @param intStreams intermediate {@link StreamSpec}s
-   * @throws Exception exception
+   * Create intermediate streams using {@link org.apache.samza.execution.StreamManager}.
+   * If {@link CoordinationUtils} is provided, this function will first invoke leader election, and the leader
+   * will create the streams. All the runner processes will wait on the latch that is released after the leader finishes
+   * stream creation.
+   * @param intStreams list of intermediate {@link StreamSpec}s
+   * @throws Exception exception for latch timeout
    */
   /* package private */ void createStreams(List<StreamSpec> intStreams) throws Exception {
     if (!intStreams.isEmpty()) {
-      if (coordination != null) {
-        Latch initLatch = coordination.getLatch(1, LATCH_INIT);
-        coordination.getLeaderElector().tryBecomeLeader(() -> {
+      if (coordinationUtils != null) {
+        Latch initLatch = coordinationUtils.getLatch(1, INIT_LATCH_ID);
+        coordinationUtils.getLeaderElector().tryBecomeLeader(() -> {
             getStreamManager().createStreams(intStreams);
             initLatch.countDown();
           });
@@ -191,30 +227,6 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
   }
 
   /**
-   * Generates processorId for each {@link StreamProcessor} using the configured {@link ProcessorIdGenerator}
-   *
-   * @param config Application config
-   * @return String that uniquely represents an instance of {@link StreamProcessor} in the current JVM
-   */
-  /* package private */
-  String getProcessorId(Config config) {
-    // TODO: This check to be removed after 0.13+
-    ApplicationConfig appConfig = new ApplicationConfig(config);
-    if (appConfig.getProcessorId() != null) {
-      return appConfig.getProcessorId();
-    } else if (appConfig.getAppProcessorIdGeneratorClass() != null) {
-      ProcessorIdGenerator idGenerator =
-          ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(),
-              ProcessorIdGenerator.class);
-      return idGenerator.generateProcessorId(config);
-    } else {
-      throw new ConfigException(
-          String.format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
-              ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
-    }
-  }
-
-  /**
    * Create {@link StreamProcessor} based on {@link StreamApplication} and the config
    * @param config config
    * @param app {@link StreamApplication}
@@ -222,34 +234,19 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
    */
   /* package private */
   StreamProcessor createStreamProcessor(
-      String processorId,
       Config config,
       StreamApplication app,
       StreamProcessorLifecycleListener listener) {
     Object taskFactory = TaskFactoryUtil.createTaskFactory(config, app, this);
     if (taskFactory instanceof StreamTaskFactory) {
       return new StreamProcessor(
-          processorId, config, new HashMap<>(), (StreamTaskFactory) taskFactory, listener);
+          config, new HashMap<>(), (StreamTaskFactory) taskFactory, listener);
     } else if (taskFactory instanceof AsyncStreamTaskFactory) {
       return new StreamProcessor(
-          processorId, config, new HashMap<>(), (AsyncStreamTaskFactory) taskFactory, listener);
+          config, new HashMap<>(), (AsyncStreamTaskFactory) taskFactory, listener);
     } else {
       throw new SamzaException(String.format("%s is not a valid task factory",
           taskFactory.getClass().getCanonicalName()));
     }
   }
-
-  /**
-   * Wait for all the {@link StreamProcessor}s to finish.
-   * @throws Throwable exceptions thrown by the processors.
-   */
-  /* package private */ void awaitComplete() throws Throwable {
-    latch.await();
-
-    if (throwable.get() != null) {
-      throw throwable.get();
-    } else {
-      appStatus = ApplicationStatus.SuccessfulFinish;
-    }
-  }
 }
index 5daecda..309d8c8 100644 (file)
@@ -128,5 +128,4 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
       throw new SamzaException("Failed to get status for application", t);
     }
   }
-
 }
index 61ead18..68a89ab 100644 (file)
 package org.apache.samza.standalone;
 
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.coordinator.JobCoordinatorListener;
+import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemFactory;
-import org.apache.samza.util.SystemClock;
-import org.apache.samza.util.Util;
+import org.apache.samza.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,8 +68,8 @@ public class StandaloneJobCoordinator implements JobCoordinator {
   private final Config config;
   private JobCoordinatorListener coordinatorListener = null;
 
-  public StandaloneJobCoordinator(String processorId, Config config) {
-    this.processorId = processorId;
+  public StandaloneJobCoordinator(Config config) {
+    this.processorId = createProcessorId(config);
     this.config = config;
   }
 
@@ -102,11 +104,6 @@ public class StandaloneJobCoordinator implements JobCoordinator {
   }
 
   @Override
-  public String getProcessorId() {
-    return processorId;
-  }
-
-  @Override
   public void setListener(JobCoordinatorListener listener) {
     this.coordinatorListener = listener;
   }
@@ -136,4 +133,25 @@ public class StandaloneJobCoordinator implements JobCoordinator {
      */
     return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, null);
   }
+
+  @Override
+  public String getProcessorId() {
+    return this.processorId;
+  }
+
+  private String createProcessorId(Config config) {
+    // TODO: This check to be removed after 0.13+
+    ApplicationConfig appConfig = new ApplicationConfig(config);
+    if (appConfig.getProcessorId() != null) {
+      return appConfig.getProcessorId();
+    } else if (appConfig.getAppProcessorIdGeneratorClass() != null) {
+      ProcessorIdGenerator idGenerator =
+          ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
+      return idGenerator.generateProcessorId(config);
+    } else {
+      throw new ConfigException(String
+          .format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
+              ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
+    }
+  }
 }
index 8c27ebe..764dc4c 100644 (file)
@@ -22,9 +22,9 @@ import org.apache.samza.config.Config;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
 
-public class StandaloneJobCoordinatorFactory  implements JobCoordinatorFactory {
+public class StandaloneJobCoordinatorFactory implements JobCoordinatorFactory {
   @Override
-  public JobCoordinator getJobCoordinator(String processorId, Config config) {
-    return new StandaloneJobCoordinator(processorId, config);
+  public JobCoordinator getJobCoordinator(Config config) {
+    return new StandaloneJobCoordinator(config);
   }
 }
\ No newline at end of file
index d2d0199..08f779f 100644 (file)
@@ -27,20 +27,19 @@ import java.util.Map;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JavaSystemConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.coordinator.BarrierForVersionUpgrade;
-import org.apache.samza.coordinator.CoordinationServiceFactory;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.coordinator.JobCoordinatorListener;
+import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemFactory;
-import org.apache.samza.util.SystemClock;
-import org.apache.samza.util.Util;
+import org.apache.samza.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,21 +61,15 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private JobCoordinatorListener coordinatorListener = null;
   private JobModel newJobModel;
 
-  public ZkJobCoordinator(String processorId, Config config, ScheduleAfterDebounceTime debounceTimer) {
-    this.processorId = processorId;
+  public ZkJobCoordinator(Config config, ScheduleAfterDebounceTime debounceTimer) {
     this.debounceTimer = debounceTimer;
     this.config = config;
-
-    this.coordinationUtils = Util.
-        <CoordinationServiceFactory>getObj(
-            new JobCoordinatorConfig(config)
-                .getJobCoordinationServiceFactoryClassName())
+    this.processorId = createProcessorId(config);
+    this.coordinationUtils = new ZkCoordinationServiceFactory()
         .getCoordinationService(new ApplicationConfig(config).getGlobalAppId(), String.valueOf(processorId), config);
-
     this.zkUtils = ((ZkCoordinationUtils) coordinationUtils).getZkUtils();
     this.zkController = new ZkControllerImpl(processorId, zkUtils, debounceTimer, this);
-
-    streamMetadataCache = getStreamMetadataCache();
+    this.streamMetadataCache = getStreamMetadataCache();
   }
 
   private StreamMetadataCache getStreamMetadataCache() {
@@ -114,11 +107,6 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   }
 
   @Override
-  public String getProcessorId() {
-    return processorId;
-  }
-
-  @Override
   public void setListener(JobCoordinatorListener listener) {
     this.coordinatorListener = listener;
   }
@@ -128,6 +116,11 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     return newJobModel;
   }
 
+  @Override
+  public String getProcessorId() {
+    return processorId;
+  }
+
   //////////////////////////////////////////////// LEADER stuff ///////////////////////////
   @Override
   public void onBecomeLeader() {
@@ -187,6 +180,22 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     }
   }
 
+  private String createProcessorId(Config config) {
+    // TODO: This check to be removed after 0.13+
+    ApplicationConfig appConfig = new ApplicationConfig(config);
+    if (appConfig.getProcessorId() != null) {
+      return appConfig.getProcessorId();
+    } else if (appConfig.getAppProcessorIdGeneratorClass() != null) {
+      ProcessorIdGenerator idGenerator =
+          ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
+      return idGenerator.generateProcessorId(config);
+    } else {
+      throw new ConfigException(String
+          .format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
+              ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
+    }
+  }
+
   /**
    * Generate new JobModel when becoming a leader or the list of processor changed.
    */
index a7239eb..e02e504 100644 (file)
@@ -27,17 +27,13 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
   /**
    * Method to instantiate an implementation of JobCoordinator
    *
-   * @param processorId - id of this processor
    * @param config - configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
    * @return An instance of IJobCoordinator
    */
   @Override
-  public JobCoordinator getJobCoordinator(String processorId, Config config) {
+  public JobCoordinator getJobCoordinator(Config config) {
     ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
 
-    return new ZkJobCoordinator(
-        processorId,
-        config,
-        debounceTimer);
+    return new ZkJobCoordinator(config, debounceTimer);
   }
 }
index 6cd3105..2d2bf16 100644 (file)
@@ -182,7 +182,7 @@ public class TestLocalApplicationRunner {
     };
     when(coordinationUtils.getLeaderElector()).thenReturn(leaderElector);
     when(coordinationUtils.getLatch(anyInt(), anyString())).thenReturn(latch);
-    doReturn(coordinationUtils).when(spy).getCoordinationUtils();
+    doReturn(coordinationUtils).when(spy).createCoordinationUtils();
 
     try {
       spy.run(app);
@@ -235,13 +235,14 @@ public class TestLocalApplicationRunner {
     doAnswer(i ->
       {
         StreamProcessorLifecycleListener listener = captor.getValue();
+        listener.onStart();
         listener.onShutdown();
         return null;
       }).when(sp).start();
 
 
     LocalApplicationRunner spy = spy(runner);
-    doReturn(sp).when(spy).createStreamProcessor(anyString(), anyObject(), anyObject(), captor.capture());
+    doReturn(sp).when(spy).createStreamProcessor(anyObject(), anyObject(), captor.capture());
 
     spy.run(app);
 
@@ -293,7 +294,7 @@ public class TestLocalApplicationRunner {
 
 
     LocalApplicationRunner spy = spy(runner);
-    doReturn(sp).when(spy).createStreamProcessor(anyString(), anyObject(), anyObject(), captor.capture());
+    doReturn(sp).when(spy).createStreamProcessor(anyObject(), anyObject(), captor.capture());
 
     try {
       spy.run(app);
index 75609aa..b0bfbb5 100644 (file)
@@ -47,11 +47,26 @@ import org.apache.samza.task.StreamTaskFactory;
 import org.apache.samza.test.StandaloneIntegrationTestHarness;
 import org.apache.samza.test.StandaloneTestUtils;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
 import static org.apache.samza.test.processor.IdentityStreamTask.endLatch;
 
 public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
+
+  private StreamProcessorLifecycleListener listener;
+
+  @Before
+  public void setup() {
+    listener = mock(StreamProcessorLifecycleListener.class);
+    doNothing().when(listener).onStart();
+    doNothing().when(listener).onShutdown();
+    doNothing().when(listener).onFailure(anyObject());
+  }
+
   /**
    * Testing a basic identity stream task - reads data from a topic and writes it to another topic
    * (without any modifications)
@@ -72,26 +87,10 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
     // TopicExistsException since StreamProcessor auto-creates them.
     createTopics(inputTopic, outputTopic);
     final StreamProcessor processor = new StreamProcessor(
-        "1",
         new MapConfig(configs),
         new HashMap<>(),
         IdentityStreamTask::new,
-        new StreamProcessorLifecycleListener() {
-          @Override
-          public void onStart() {
-
-          }
-
-          @Override
-          public void onShutdown() {
-
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-
-          }
-        });
+        listener);
 
     produceMessages(inputTopic, messageCount);
     run(processor, endLatch);
@@ -112,26 +111,7 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
     createTopics(inputTopic, outputTopic);
     final StreamTaskFactory stf = IdentityStreamTask::new;
     final StreamProcessor processor =
-        new StreamProcessor("1", configs, new HashMap<>(), stf, new StreamProcessorLifecycleListener() {
-          /**
-           * Callback when the {@link StreamProcessor} is started
-           */
-          @Override
-          public void onStart() { }
-          /**
-           * Callback when the {@link StreamProcessor} is shut down.
-           */
-          @Override
-          public void onShutdown() { }
-
-          /**
-           * Callback when the {@link StreamProcessor} fails
-           *
-           * @param t exception of the failure
-           */
-          @Override
-          public void onFailure(Throwable t) { }
-        });
+        new StreamProcessor(configs, new HashMap<>(), stf, listener);
 
     produceMessages(inputTopic, messageCount);
     run(processor, endLatch);
@@ -153,26 +133,10 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
     createTopics(inputTopic, outputTopic);
     final AsyncStreamTaskFactory stf = () -> new AsyncStreamTaskAdapter(new IdentityStreamTask(), executorService);
     final StreamProcessor processor = new StreamProcessor(
-        "1",
         configs,
         new HashMap<>(),
         stf,
-        new StreamProcessorLifecycleListener() {
-          @Override
-          public void onStart() {
-
-          }
-
-          @Override
-          public void onShutdown() {
-
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-
-          }
-        });
+        listener);
 
     produceMessages(inputTopic, messageCount);
     run(processor, endLatch);
@@ -195,26 +159,10 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
     final Config configs = new MapConfig(configMap);
 
     StreamProcessor processor = new StreamProcessor(
-        "1",
         configs,
         new HashMap<>(),
         (StreamTaskFactory) null,
-        new StreamProcessorLifecycleListener() {
-          @Override
-          public void onStart() {
-
-          }
-
-          @Override
-          public void onShutdown() {
-
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-
-          }
-        });
+        listener);
     run(processor, endLatch);
   }