SAMZA-1954: User provided configuration should have higher precedence than Samza...
authorPrateek Maheshwari <pmaheshwari@apache.org>
Tue, 16 Oct 2018 01:55:28 +0000 (18:55 -0700)
committerPrateek Maheshwari <pmaheshwari@apache.org>
Tue, 16 Oct 2018 01:55:28 +0000 (18:55 -0700)
Author: Prateek Maheshwari <pmaheshwari@apache.org>

Reviewers: Cameron Lee <calee@linkedin.com>

Closes #728 from prateekm/config-precedence

23 files changed:
samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java
samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java
samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java
samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java
samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java
samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java
samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java
samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java
samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaOutputDescriptor.java
samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemoryInputDescriptor.java
samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java

index 6d18afc..aa5c8d2 100644 (file)
@@ -30,7 +30,7 @@ import org.apache.samza.serializers.Serde;
  * Otherwise, this {@link GenericInputDescriptor} may be used to provide Samza-specific properties of the input stream.
  * Additional system stream specific properties may be provided using {@link #withStreamConfigs}
  * <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ * Stream properties provided in configuration override corresponding properties configured using a descriptor.
  *
  * @param <StreamMessageType> type of messages in this stream.
  */
index 5fe3a98..1d81525 100644 (file)
@@ -30,7 +30,7 @@ import org.apache.samza.serializers.Serde;
  * Otherwise, this {@link GenericOutputDescriptor} may be used to provide Samza-specific properties of the output stream.
  * Additional system stream specific properties may be provided using {@link #withStreamConfigs}
  * <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ * Stream properties provided in configuration override corresponding properties configured using a descriptor.
  *
  * @param <StreamMessageType> type of messages in this stream.
  */
index 59b2a12..eb86877 100644 (file)
@@ -28,7 +28,7 @@ import org.apache.samza.serializers.Serde;
  * Otherwise, this {@link GenericSystemDescriptor} may be used to provide Samza-specific properties of the system.
  * Additional system specific properties may be provided using {@link #withSystemConfigs}
  * <p>
- * System properties configured using a descriptor override corresponding properties provided in configuration.
+ * System properties provided in configuration override corresponding properties configured using a descriptor.
  */
 public final class GenericSystemDescriptor extends SystemDescriptor<GenericSystemDescriptor>
     implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
index 2c0ca88..fd7a50c 100644 (file)
@@ -28,7 +28,7 @@ import org.apache.samza.system.SystemStreamMetadata.OffsetType;
 /**
  * The base descriptor for an input stream. Allows setting properties that are common to all input streams.
  * <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ * Stream properties provided in configuration override corresponding properties configured using a descriptor.
  *
  * @param <StreamMessageType> type of messages in this stream.
  * @param <SubClass> type of the concrete sub-class
index 70a5d0f..898be1e 100644 (file)
@@ -23,7 +23,7 @@ import org.apache.samza.serializers.Serde;
 /**
  * The base descriptor for an output stream. Allows setting properties that are common to all output streams.
  * <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ * Stream properties provided in configuration override corresponding properties configured using a descriptor.
  *
  * @param <StreamMessageType> type of messages in this stream.
  * @param <SubClass> type of the concrete sub-class
index d2b25f9..e8e586f 100644 (file)
@@ -31,7 +31,7 @@ import org.apache.samza.serializers.Serde;
 /**
  * The base descriptor for an input or output stream. Allows setting properties that are common to all streams.
  * <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ * Stream properties provided in configuration override corresponding properties configured using a descriptors.
  *
  * @param <StreamMessageType> type of messages in this stream.
  * @param <SubClass> type of the concrete sub-class
index 4b93a32..9db2544 100644 (file)
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
 /**
  * The base descriptor for a system. Allows setting properties that are common to all systems.
  * <p>
- * System properties configured using a descriptor override corresponding properties provided in configuration.
+ * System properties provided in configuration override corresponding properties configured using a descriptor.
  * <p>
  * Systems may provide an {@link InputTransformer} to be used for input streams on the system. An
  * {@link InputTransformer} transforms an {@code IncomingMessageEnvelope} with deserialized key and message
index cce716c..462cc05 100644 (file)
@@ -31,11 +31,10 @@ import org.apache.samza.system.eventhub.EventHubConfig;
 
 /**
  * A descriptor for the Event Hubs output stream
- *<p>
- *   An instance of this descriptor may be obtained from an {@link EventHubsSystemDescriptor}
- *</p>
- * Stream properties configured using a descriptor overrides corresponding properties and property defaults provided
- * in configuration.
+ * <p>
+ * An instance of this descriptor may be obtained from an {@link EventHubsSystemDescriptor}
+ * <p>
+ * Stream properties provided in configuration override corresponding properties configured using a descriptor.
  *
  * @param <StreamMessageType> type of messages in this stream
  */
index cd17033..ddbf79c 100644 (file)
@@ -31,10 +31,9 @@ import org.apache.samza.system.eventhub.EventHubConfig;
 /**
  * A descriptor for an Event Hubs output stream
  * <p>
- *   An instance of this descriptor may be obtained from and {@link EventHubsSystemDescriptor}
- * </p>
- * Stream properties configured using a descriptor overrides corresponding properties and property defaults provided
- * in configuration.
+ * An instance of this descriptor may be obtained from and {@link EventHubsSystemDescriptor}
+ * <p>
+ * Stream properties provided in configuration override corresponding properties configured using a descriptor.
  *
  * @param <StreamMessageType> type of messages in this stream
  */
index 4e292d9..80bdfae 100644 (file)
@@ -33,7 +33,7 @@ import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.Partitio
 /**
  * A descriptor for a Event Hubs system.
  * <p>
- * System properties configured using a descriptor override corresponding properties provided in configuration.
+ * System properties provided in configuration override corresponding properties configured using a descriptor.
  */
 public class EventHubsSystemDescriptor extends SystemDescriptor<EventHubsSystemDescriptor> {
   private static final String FACTORY_CLASS_NAME = EventHubSystemFactory.class.getName();
index 676d28e..70c9b23 100644 (file)
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
@@ -66,67 +67,80 @@ import org.slf4j.LoggerFactory;
 
   static final String CONFIG_INTERNAL_EXECUTION_PLAN = "samza.internal.execution.plan";
 
-  static JobConfig mergeJobConfig(Config originalConfig, Config generatedConfig) {
-    JobConfig jobConfig = new JobConfig(originalConfig);
-    String jobNameAndId = JobNode.createJobNameAndId(jobConfig.getName().get(), jobConfig.getJobId());
-    return new JobConfig(Util.rewriteConfig(extractScopedConfig(originalConfig, generatedConfig,
-        String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), jobNameAndId))));
+  static Config mergeConfig(Map<String, String> originalConfig, Map<String, String> generatedConfig) {
+    Map<String, String> mergedConfig = new HashMap<>(generatedConfig);
+    originalConfig.forEach((k, v) -> {
+        if (generatedConfig.containsKey(k) &&
+            !Objects.equals(generatedConfig.get(k), v)) {
+          LOG.info("Replacing generated config for key: {} value: {} with original config value: {}",
+              k, generatedConfig.get(k), v);
+        }
+        mergedConfig.put(k, v);
+      });
+
+    return Util.rewriteConfig(new MapConfig(mergedConfig));
   }
 
   JobConfig generateJobConfig(JobNode jobNode, String executionPlanJson) {
-    Map<String, String> configs = new HashMap<>();
+    if (jobNode.isLegacyTaskApplication()) {
+      return new JobConfig(jobNode.getConfig());
+    }
+
+    Map<String, String> generatedConfig = new HashMap<>();
     // set up job name and job ID
-    configs.put(JobConfig.JOB_NAME(), jobNode.getJobName());
-    configs.put(JobConfig.JOB_ID(), jobNode.getJobId());
+    generatedConfig.put(JobConfig.JOB_NAME(), jobNode.getJobName());
+    generatedConfig.put(JobConfig.JOB_ID(), jobNode.getJobId());
 
     Map<String, StreamEdge> inEdges = jobNode.getInEdges();
     Map<String, StreamEdge> outEdges = jobNode.getOutEdges();
     Collection<OperatorSpec> reachableOperators = jobNode.getReachableOperators();
     List<StoreDescriptor> stores = getStoreDescriptors(reachableOperators);
     Map<String, TableSpec> reachableTables = getReachableTables(reachableOperators, jobNode);
-    Config config = jobNode.getConfig();
+
+    // config passed by the JobPlanner. user-provided + system-stream descriptor config + misc. other config
+    Config originalConfig = jobNode.getConfig();
 
     // check all inputs to the node for broadcast and input streams
     final Set<String> inputs = new HashSet<>();
-    final Set<String> broadcasts = new HashSet<>();
+    final Set<String> broadcastInputs = new HashSet<>();
     for (StreamEdge inEdge : inEdges.values()) {
       String formattedSystemStream = inEdge.getName();
       if (inEdge.isBroadcast()) {
-        broadcasts.add(formattedSystemStream + "#0");
+        broadcastInputs.add(formattedSystemStream + "#0");
       } else {
         inputs.add(formattedSystemStream);
       }
     }
 
-    configureBroadcastInputs(configs, config, broadcasts);
+    configureBroadcastInputs(generatedConfig, originalConfig, broadcastInputs);
 
     // compute window and join operator intervals in this node
-    configureWindowInterval(configs, config, reachableOperators);
+    configureWindowInterval(generatedConfig, originalConfig, reachableOperators);
 
     // set store configuration for stateful operators.
-    stores.forEach(sd -> configs.putAll(sd.getStorageConfigs()));
+    stores.forEach(sd -> generatedConfig.putAll(sd.getStorageConfigs()));
 
     // set the execution plan in json
-    configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);
+    generatedConfig.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);
 
     // write intermediate input/output streams to configs
-    inEdges.values().stream().filter(StreamEdge::isIntermediate).forEach(edge -> configs.putAll(edge.generateConfig()));
+    inEdges.values().stream().filter(StreamEdge::isIntermediate)
+        .forEach(intermediateEdge -> generatedConfig.putAll(intermediateEdge.generateConfig()));
 
     // write serialized serde instances and stream, store, and table serdes to configs
     // serde configuration generation has to happen before table configuration, since the serde configuration
     // is required when generating configurations for some TableProvider (i.e. local store backed tables)
-    configureSerdes(configs, inEdges, outEdges, stores, reachableTables.keySet(), jobNode);
+    configureSerdes(generatedConfig, inEdges, outEdges, stores, reachableTables.keySet(), jobNode);
 
     // generate table configuration and potential side input configuration
-    configureTables(configs, config, reachableTables, inputs);
+    configureTables(generatedConfig, originalConfig, reachableTables, inputs);
 
-    // finalize the task.inputs configuration
-    configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
+    // generate the task.inputs configuration
+    generatedConfig.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs));
 
-    LOG.info("Job {} has generated configs {}", jobNode.getJobNameAndId(), configs);
+    LOG.info("Job {} has generated configs {}", jobNode.getJobNameAndId(), generatedConfig);
 
-    // apply configure rewriters and user configure overrides
-    return applyConfigureRewritersAndOverrides(configs, config, jobNode);
+    return new JobConfig(mergeConfig(originalConfig, generatedConfig));
   }
 
   private Map<String, TableSpec> getReachableTables(Collection<OperatorSpec> reachableOperators, JobNode jobNode) {
@@ -140,9 +154,9 @@ import org.slf4j.LoggerFactory;
     if (broadcastStreams.isEmpty()) {
       return;
     }
-    final String taskBroadcasts = config.get(TaskConfigJava.BROADCAST_INPUT_STREAMS);
-    if (StringUtils.isNoneEmpty(taskBroadcasts)) {
-      broadcastStreams.add(taskBroadcasts);
+    String broadcastInputs = config.get(TaskConfigJava.BROADCAST_INPUT_STREAMS);
+    if (StringUtils.isNotBlank(broadcastInputs)) {
+      broadcastStreams.add(broadcastInputs);
     }
     configs.put(TaskConfigJava.BROADCAST_INPUT_STREAMS, Joiner.on(',').join(broadcastStreams));
   }
@@ -155,12 +169,10 @@ import org.slf4j.LoggerFactory;
     }
 
     // set triggering interval if a window or join is defined. Only applies to high-level applications
-    if ("-1".equals(config.get(TaskConfig.WINDOW_MS(), "-1"))) {
-      long triggerInterval = computeTriggerInterval(reachableOperators);
-      LOG.info("Using triggering interval: {}", triggerInterval);
+    long triggerInterval = computeTriggerInterval(reachableOperators);
+    LOG.info("Using triggering interval: {}", triggerInterval);
 
-      configs.put(TaskConfig.WINDOW_MS(), String.valueOf(triggerInterval));
-    }
+    configs.put(TaskConfig.WINDOW_MS(), String.valueOf(triggerInterval));
   }
 
   /**
@@ -190,68 +202,27 @@ import org.slf4j.LoggerFactory;
     return MathUtil.gcd(candidateTimerIntervals);
   }
 
-  private JobConfig applyConfigureRewritersAndOverrides(Map<String, String> configs, Config config, JobNode jobNode) {
-    // Disallow user specified job inputs/outputs. This info comes strictly from the user application.
-    Map<String, String> allowedConfigs = new HashMap<>(config);
-    if (!jobNode.isLegacyTaskApplication()) {
-      if (allowedConfigs.containsKey(TaskConfig.INPUT_STREAMS())) {
-        LOG.warn("Specifying task inputs in configuration is not allowed for SamzaApplication. "
-            + "Ignoring configured value for " + TaskConfig.INPUT_STREAMS());
-        allowedConfigs.remove(TaskConfig.INPUT_STREAMS());
-      }
-    }
-
-    LOG.debug("Job {} has allowed configs {}", jobNode.getJobNameAndId(), allowedConfigs);
-    return mergeJobConfig(new MapConfig(allowedConfigs), new MapConfig(configs));
-  }
-
-  /**
-   * This function extract the subset of configs from the full config, and use it to override the generated configs
-   * from the job.
-   * @param fullConfig full config
-   * @param generatedConfig config generated for the job
-   * @param configPrefix prefix to extract the subset of the config overrides
-   * @return config that merges the generated configs and overrides
-   */
-  private static Config extractScopedConfig(Config fullConfig, Config generatedConfig, String configPrefix) {
-    Config scopedConfig = fullConfig.subset(configPrefix);
-
-    Config[] configPrecedence = new Config[] {fullConfig, generatedConfig, scopedConfig};
-    // Strip empty configs so they don't override the configs before them.
-    Map<String, String> mergedConfig = new HashMap<>();
-    for (Map<String, String> config : configPrecedence) {
-      for (Map.Entry<String, String> property : config.entrySet()) {
-        String value = property.getValue();
-        if (!(value == null || value.isEmpty())) {
-          mergedConfig.put(property.getKey(), property.getValue());
-        }
-      }
-    }
-    scopedConfig = new MapConfig(mergedConfig);
-    LOG.debug("Prefix '{}' has merged config {}", configPrefix, scopedConfig);
-
-    return scopedConfig;
-  }
-
   private List<StoreDescriptor> getStoreDescriptors(Collection<OperatorSpec> reachableOperators) {
     return reachableOperators.stream().filter(operatorSpec -> operatorSpec instanceof StatefulOperatorSpec)
         .map(operatorSpec -> ((StatefulOperatorSpec) operatorSpec).getStoreDescriptors()).flatMap(Collection::stream)
         .collect(Collectors.toList());
   }
 
-  private void configureTables(Map<String, String> configs, Config config, Map<String, TableSpec> tables, Set<String> inputs) {
-    configs.putAll(TableConfigGenerator.generateConfigsForTableSpecs(new MapConfig(configs),
-        tables.values().stream().collect(Collectors.toList())));
+  private void configureTables(Map<String, String> generatedConfig, Config originalConfig,
+      Map<String, TableSpec> tables, Set<String> inputs) {
+    generatedConfig.putAll(
+        TableConfigGenerator.generateConfigsForTableSpecs(
+            new MapConfig(generatedConfig), new ArrayList<>(tables.values())));
 
     // Add side inputs to the inputs and mark the stream as bootstrap
     tables.values().forEach(tableSpec -> {
         List<String> sideInputs = tableSpec.getSideInputs();
         if (sideInputs != null && !sideInputs.isEmpty()) {
           sideInputs.stream()
-              .map(sideInput -> StreamUtil.getSystemStreamFromNameOrId(config, sideInput))
+              .map(sideInput -> StreamUtil.getSystemStreamFromNameOrId(originalConfig, sideInput))
               .forEach(systemStream -> {
                   inputs.add(StreamUtil.getNameFromSystemStream(systemStream));
-                  configs.put(String.format(StreamConfig.STREAM_PREFIX() + StreamConfig.BOOTSTRAP(),
+                  generatedConfig.put(String.format(StreamConfig.STREAM_PREFIX() + StreamConfig.BOOTSTRAP(),
                       systemStream.getSystem(), systemStream.getStream()), "true");
                 });
         }
index dc0fc59..7ea7367 100644 (file)
@@ -32,9 +32,9 @@ import org.apache.samza.application.LegacyTaskApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.StreamConfig;
+import org.apache.samza.config.TaskConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,11 +49,11 @@ public abstract class JobPlanner {
   private static final Logger LOG = LoggerFactory.getLogger(JobPlanner.class);
 
   protected final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
-  protected final Config config;
+  protected final Config userConfig;
 
   JobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor) {
     this.appDesc = descriptor;
-    this.config = descriptor.getConfig();
+    this.userConfig = descriptor.getConfig();
   }
 
   public abstract List<JobConfig> prepareJobs();
@@ -70,35 +70,22 @@ public abstract class JobPlanner {
 
   /* package private */
   ExecutionPlan getExecutionPlan(String runId) {
+    Map<String, String> generatedConfig = getGeneratedConfig(runId);
 
-    // update application configs
-    Map<String, String> cfg = new HashMap<>();
-    if (StringUtils.isNoneEmpty(runId)) {
-      cfg.put(ApplicationConfig.APP_RUN_ID, runId);
+    // merge user-provided configuration with generated configuration. generated configuration has lower priority.
+    // TODO: This should all be consolidated with ExecutionPlanner after fixing SAMZA-1811
+    Map<String, String> allowedUserConfig = new HashMap<>(userConfig);
+    if (!LegacyTaskApplication.class.isAssignableFrom(appDesc.getAppClass())) {
+      LOG.warn("SamzaApplications should not specify task.inputs in configuration. " +
+          "Ignoring configured value of " + userConfig.get(TaskConfig.INPUT_STREAMS()));
+      allowedUserConfig.remove(TaskConfig.INPUT_STREAMS()); // must be set using descriptors or operators
     }
 
-    StreamConfig streamConfig = new StreamConfig(config);
-    Set<String> inputStreams = new HashSet<>(appDesc.getInputStreamIds());
-    inputStreams.removeAll(appDesc.getOutputStreamIds());
-    ApplicationConfig.ApplicationMode mode = inputStreams.stream().allMatch(streamConfig::getIsBounded)
-        ? ApplicationConfig.ApplicationMode.BATCH : ApplicationConfig.ApplicationMode.STREAM;
-    cfg.put(ApplicationConfig.APP_MODE, mode.name());
-
-    // merge user-provided configuration with input/output descriptor generated configuration
-    // descriptor generated configuration has higher priority
-    Map<String, String> systemStreamConfigs = expandSystemStreamConfigs(appDesc);
-    cfg.putAll(systemStreamConfigs);
-
-    // adding app.class in the configuration, unless it is LegacyTaskApplication
-    if (!LegacyTaskApplication.class.getName().equals(appDesc.getAppClass().getName())) {
-      cfg.put(ApplicationConfig.APP_CLASS, appDesc.getAppClass().getName());
-    }
+    Config mergedConfig = JobNodeConfigurationGenerator.mergeConfig(allowedUserConfig, generatedConfig);
 
-    // create the physical execution plan and merge with overrides. This works for a single-stage job now
-    // TODO: This should all be consolidated with ExecutionPlanner after fixing SAMZA-1811
-    Config mergedConfig = JobNodeConfigurationGenerator.mergeJobConfig(config, new MapConfig(cfg));
     // creating the StreamManager to get all input/output streams' metadata for planning
     StreamManager streamManager = buildAndStartStreamManager(mergedConfig);
+
     try {
       ExecutionPlanner planner = new ExecutionPlanner(mergedConfig, streamManager);
       return planner.plan(appDesc);
@@ -128,7 +115,32 @@ public abstract class JobPlanner {
     }
   }
 
-  private Map<String, String> expandSystemStreamConfigs(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
+  private Map<String, String> getGeneratedConfig(String runId) {
+    Map<String, String> generatedConfig = new HashMap<>();
+    if (StringUtils.isNoneEmpty(runId)) {
+      generatedConfig.put(ApplicationConfig.APP_RUN_ID, runId);
+    }
+
+    StreamConfig streamConfig = new StreamConfig(userConfig);
+    Set<String> inputStreamIds = new HashSet<>(appDesc.getInputStreamIds());
+    inputStreamIds.removeAll(appDesc.getOutputStreamIds()); // exclude intermediate streams
+    ApplicationConfig.ApplicationMode mode =
+        inputStreamIds.stream().allMatch(streamConfig::getIsBounded)
+            ? ApplicationConfig.ApplicationMode.BATCH
+            : ApplicationConfig.ApplicationMode.STREAM;
+    generatedConfig.put(ApplicationConfig.APP_MODE, mode.name());
+
+    Map<String, String> systemStreamConfigs = generateSystemStreamConfigs(appDesc);
+    generatedConfig.putAll(systemStreamConfigs);
+
+    // adding app.class in the configuration, unless it is LegacyTaskApplication
+    if (!LegacyTaskApplication.class.getName().equals(appDesc.getAppClass().getName())) {
+      generatedConfig.put(ApplicationConfig.APP_CLASS, appDesc.getAppClass().getName());
+    }
+    return generatedConfig;
+  }
+
+  private Map<String, String> generateSystemStreamConfigs(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
     Map<String, String> systemStreamConfigs = new HashMap<>();
     appDesc.getInputDescriptors().forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig()));
     appDesc.getOutputDescriptors().forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig()));
index 6ca5f3d..9e2f745 100644 (file)
@@ -107,10 +107,10 @@ public class LocalJobPlanner extends JobPlanner {
     LOG.info("A single processor must create the intermediate streams. Processor {} will attempt to acquire the lock.", uid);
     // Move the scope of coordination utils within stream creation to address long idle connection problem.
     // Refer SAMZA-1385 for more details
-    JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
-    String coordinationId = new ApplicationConfig(config).getGlobalAppId() + APPLICATION_RUNNER_PATH_SUFFIX;
+    JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(userConfig);
+    String coordinationId = new ApplicationConfig(userConfig).getGlobalAppId() + APPLICATION_RUNNER_PATH_SUFFIX;
     CoordinationUtils coordinationUtils =
-        jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(coordinationId, uid, config);
+        jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(coordinationId, uid, userConfig);
     if (coordinationUtils == null) {
       LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", uid);
       // each application process will try creating the streams, which
index 13b29df..c51fd85 100644 (file)
@@ -87,7 +87,7 @@ public class RemoteJobPlanner extends JobPlanner {
   }
 
   private Config getConfigFromPrevRun() {
-    CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap());
+    CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(userConfig, new MetricsRegistryMap());
     consumer.register();
     consumer.start();
     consumer.bootstrap();
index d7b71b5..4f19ade 100644 (file)
@@ -39,7 +39,6 @@ object JobConfig {
    */
   val CONFIG_REWRITERS = "job.config.rewriters" // streaming.job_config_rewriters
   val CONFIG_REWRITER_CLASS = "job.config.rewriter.%s.class" // streaming.job_config_rewriter_class - regex, system, config
-  val CONFIG_OVERRIDE_JOBS_PREFIX = "jobs.%s."
   val JOB_NAME = "job.name" // streaming.job_name
   val JOB_ID = "job.id" // streaming.job_id
   val SAMZA_FWK_PATH = "samza.fwk.path"
index f49958c..6d017cb 100644 (file)
@@ -672,9 +672,8 @@ public class TestExecutionPlanner {
   }
 
   @Test
-  public void testTriggerIntervalWithInvalidWindowMs() {
+  public void testTriggerIntervalWithNoWindowMs() {
     Map<String, String> map = new HashMap<>(config);
-    map.put(TaskConfig.WINDOW_MS(), "-1");
     map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
     Config cfg = new MapConfig(map);
 
index dda0ee1..404a39b 100644 (file)
@@ -239,59 +239,9 @@ public class TestJobNodeConfigurationGenerator extends ExecutionPlannerTestBase
   }
 
   @Test
-  public void testTaskInputsRemovedFromOriginalConfig() {
-    Map<String, String> configs = new HashMap<>(mockConfig);
-    configs.put(TaskConfig.INPUT_STREAMS(), "not.allowed1,not.allowed2");
-    mockConfig = spy(new MapConfig(configs));
-
-    mockStreamAppDesc = new StreamApplicationDescriptorImpl(getBroadcastOnlyStreamApplication(defaultSerde), mockConfig);
-    configureJobNode(mockStreamAppDesc);
-
-    JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator();
-    JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson");
-    Config expectedConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges());
-    validateJobConfig(expectedConfig, jobConfig);
-  }
-
-  @Test
-  public void testTaskInputsRetainedForLegacyTaskApplication() {
-    Map<String, String> originConfig = new HashMap<>(mockConfig);
-    originConfig.put(TaskConfig.INPUT_STREAMS(), "must.retain1,must.retain2");
-    mockConfig = new MapConfig(originConfig);
-    TaskApplicationDescriptorImpl taskAppDesc = new TaskApplicationDescriptorImpl(getLegacyTaskApplication(), mockConfig);
-    configureJobNode(taskAppDesc);
-
-    // create the JobGraphConfigureGenerator and generate the jobConfig for the jobNode
-    JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator();
-    JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "");
-    // jobConfig should be exactly the same as original config
-    Map<String, String> generatedConfig = new HashMap<>(jobConfig);
-    assertEquals(originConfig, generatedConfig);
-  }
-
-  @Test
-  public void testOverrideConfigs() {
-    Map<String, String> configs = new HashMap<>(mockConfig);
-    String streamCfgToOverride = String.format("streams.%s.samza.system", intermediateInputDescriptor.getStreamId());
-    String overrideCfgKey = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId()) + streamCfgToOverride;
-    configs.put(overrideCfgKey, "customized-system");
-    mockConfig = spy(new MapConfig(configs));
-    mockStreamAppDesc = new StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), mockConfig);
-    configureJobNode(mockStreamAppDesc);
-
-    JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator();
-    JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson");
-    Config expectedConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges());
-    validateJobConfig(expectedConfig, jobConfig);
-    assertEquals("customized-system", jobConfig.get(streamCfgToOverride));
-  }
-
-  @Test
-  public void testConfigureRewriter() {
+  public void testConfigRewriter() {
     Map<String, String> configs = new HashMap<>(mockConfig);
     String streamCfgToOverride = String.format("streams.%s.samza.system", intermediateInputDescriptor.getStreamId());
-    String overrideCfgKey = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId()) + streamCfgToOverride;
-    configs.put(overrideCfgKey, "customized-system");
     configs.put(String.format(JobConfig.CONFIG_REWRITER_CLASS(), "mock"), MockConfigRewriter.class.getName());
     configs.put(JobConfig.CONFIG_REWRITERS(), "mock");
     configs.put(String.format("job.config.rewriter.mock.%s", streamCfgToOverride), "rewritten-system");
index 1896fd3..fb279ab 100644 (file)
@@ -32,7 +32,7 @@ import org.apache.samza.serializers.Serde;
  * <p>
  * An instance of this descriptor may be obtained from an appropriately configured {@link KafkaSystemDescriptor}.
  * <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ * Stream properties provided in configuration override corresponding properties configured using a descriptor.
  *
  * @param <StreamMessageType> type of messages in this stream.
  */
index 0ec5ce7..f13352c 100644 (file)
@@ -27,7 +27,7 @@ import org.apache.samza.serializers.Serde;
  * <p>
  * An instance of this descriptor may be obtained from an appropriately configured {@link KafkaSystemDescriptor}.
  * <p>
- * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ * Stream properties provided in configuration override corresponding properties configured using a descriptor.
  *
  * @param <StreamMessageType> type of messages in this stream.
  */
index 0c6eaeb..6fb8c1c 100644 (file)
@@ -36,7 +36,7 @@ import org.apache.samza.system.kafka.KafkaSystemFactory;
 /**
  * A descriptor for a Kafka system.
  * <p>
- * System properties configured using a descriptor override corresponding properties provided in configuration.
+ * System properties provided in configuration override corresponding properties configured using a descriptor.
  */
 @SuppressWarnings("unchecked")
 public class KafkaSystemDescriptor extends SystemDescriptor<KafkaSystemDescriptor>
index ba7128a..531b0ef 100644 (file)
@@ -168,8 +168,7 @@ public class TestRunner {
   public TestRunner addConfig(String key, String value) {
     Preconditions.checkNotNull(key);
     Preconditions.checkNotNull(value);
-    String configPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId());
-    configs.put(String.format("%s%s", configPrefix, key), value);
+    configs.put(key, value);
     return this;
   }
 
@@ -180,8 +179,7 @@ public class TestRunner {
    */
   public TestRunner addConfig(Map<String, String> config) {
     Preconditions.checkNotNull(config);
-    String configPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId());
-    config.forEach((key, value) -> this.configs.put(String.format("%s%s", configPrefix, key), value));
+    configs.putAll(config);
     return this;
   }
 
@@ -204,10 +202,6 @@ public class TestRunner {
     return this;
   }
 
-  private String getJobNameAndId() {
-    return String.format("%s-%s", JOB_NAME, configs.getOrDefault(JobConfig.JOB_ID(), "1"));
-  }
-
   /**
    * Adds the provided input stream with mock data to the test application. Default configs and user added configs have
    * a higher precedence over system and stream descriptor generated configs.
@@ -348,12 +342,11 @@ public class TestRunner {
 
   /**
    * Creates an in memory stream with {@link InMemorySystemFactory} and feeds its partition with stream of messages
-   * @param partitonData key of the map represents partitionId and value represents
-   *                 messages in the partition
+   * @param partitionData key of the map represents partitionId and value represents messages in the partition
    * @param descriptor describes a stream to initialize with the in memory system
    */
   private <StreamMessageType> void initializeInMemoryInputStream(InMemoryInputDescriptor<?> descriptor,
-      Map<Integer, Iterable<StreamMessageType>> partitonData) {
+      Map<Integer, Iterable<StreamMessageType>> partitionData) {
     String systemName = descriptor.getSystemName();
     String streamName = (String) descriptor.getPhysicalName().orElse(descriptor.getStreamId());
     if (configs.containsKey(TaskConfig.INPUT_STREAMS())) {
@@ -366,13 +359,13 @@ public class TestRunner {
     imsd.withInMemoryScope(this.inMemoryScope);
     addConfig(descriptor.toConfig());
     addConfig(descriptor.getSystemDescriptor().toConfig());
-    StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, systemName, partitonData.size());
+    StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, systemName, partitionData.size());
     SystemFactory factory = new InMemorySystemFactory();
     Config config = new MapConfig(descriptor.toConfig(), descriptor.getSystemDescriptor().toConfig());
     factory.getAdmin(systemName, config).createStream(spec);
     SystemProducer producer = factory.getProducer(systemName, config, null);
     SystemStream sysStream = new SystemStream(systemName, streamName);
-    partitonData.forEach((partitionId, partition) -> {
+    partitionData.forEach((partitionId, partition) -> {
         partition.forEach(e -> {
             Object key = e instanceof KV ? ((KV) e).getKey() : null;
             Object value = e instanceof KV ? ((KV) e).getValue() : e;
index 0e49550..c446083 100644 (file)
@@ -25,8 +25,8 @@ import org.apache.samza.serializers.NoOpSerde;
 /**
  * A descriptor for an in memory stream of messages that can either have single or multiple partitions.
  * <p>
- *  An instance of this descriptor may be obtained from an appropriately configured {@link InMemorySystemDescriptor}.
- * <p>
+ * An instance of this descriptor may be obtained from an appropriately configured {@link InMemorySystemDescriptor}.
+ *
  * @param <StreamMessageType> type of messages in input stream
  */
 public class InMemoryInputDescriptor<StreamMessageType>
index 96a8aca..a74d0ba 100644 (file)
@@ -31,8 +31,8 @@ import org.apache.samza.system.inmemory.InMemorySystemFactory;
 import org.apache.samza.config.JavaSystemConfig;
 
 /**
- * A descriptor for InMemorySystem.
- * System properties configured using a descriptor override corresponding properties provided in configuration.
+ * Descriptor for an InMemorySystem.
+ * System properties provided in configuration override corresponding properties configured using a descriptor.
  * <p>
  * Following system level configs are set by default
  * <ol>
@@ -44,20 +44,6 @@ import org.apache.samza.config.JavaSystemConfig;
 public class InMemorySystemDescriptor extends SystemDescriptor<InMemorySystemDescriptor>
     implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
   private static final String FACTORY_CLASS_NAME = InMemorySystemFactory.class.getName();
-  /**
-   * <p>
-   * The "systems.*" configs are required since the planner uses the system to get metadata about streams during
-   * planning. The "jobs.job-name.systems.*" configs are required since configs generated from user provided
-   * system/stream descriptors override configs originally supplied to the planner. Configs in the "jobs.job-name.*"
-   * scope have the highest precedence.
-   *
-   * For this case, it generates following overridden configs
-   * <ol>
-   *      <li>"jobs.<job-name>.systems.%s.default.stream.samza.offset.default" = "oldest"</li>
-   *      <li>"jobs.<job-name>.systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
-   * </ol>
-   *
-   **/
   private String inMemoryScope;
 
   /**