Minor fix to some config variable names and accessor methods.
authorPrateek Maheshwari <pmaheshwari@apache.org>
Tue, 4 Dec 2018 22:08:57 +0000 (14:08 -0800)
committerJagadish <jvenkatraman@linkedin.com>
Tue, 4 Dec 2018 22:08:57 +0000 (14:08 -0800)
Author: Prateek Maheshwari <pmaheshwari@apache.org>

Reviewers: Jagadish<jagadish@apache.org>

Closes #840 from prateekm/fix-config-names

samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java

index a64589f..ba5d932 100644 (file)
@@ -36,9 +36,9 @@ object TaskConfig {
   val LIFECYCLE_LISTENER = "task.lifecycle.listener.%s.class" // task.lifecycle.listener.li-generator.class
   val CHECKPOINT_MANAGER_FACTORY = TaskConfigJava.CHECKPOINT_MANAGER_FACTORY // class name to use when sending offset checkpoints
   val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class"
-  val DROP_DESERIALIZATION_ERROR = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails
-  val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails
-  val DROP_PRODUCER_ERROR = "task.drop.producer.errors" // whether to ignore producer errors and drop the messages that failed to send
+  val DROP_DESERIALIZATION_ERRORS = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails
+  val DROP_SERIALIZATION_ERRORS = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails
+  val DROP_PRODUCER_ERRORS = "task.drop.producer.errors" // whether to ignore producer errors and drop the messages that failed to send
   val IGNORED_EXCEPTIONS = "task.ignored.exceptions" // exceptions to ignore in process and window
   val GROUPER_FACTORY = "task.name.grouper.factory" // class name for task grouper
   val MAX_CONCURRENCY = "task.max.concurrency" // max number of concurrent process for a AsyncStreamTask
@@ -115,11 +115,11 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
 
   def getMessageChooserClass = getOption(TaskConfig.MESSAGE_CHOOSER_CLASS_NAME)
 
-  def getDropDeserialization = getOption(TaskConfig.DROP_DESERIALIZATION_ERROR)
+  def getDropDeserializationErrors = getBoolean(TaskConfig.DROP_DESERIALIZATION_ERRORS, false)
 
-  def getDropSerialization = getOption(TaskConfig.DROP_SERIALIZATION_ERROR)
+  def getDropSerializationErrors = getBoolean(TaskConfig.DROP_SERIALIZATION_ERRORS, false)
 
-  def getDropProducerError = getBoolean(TaskConfig.DROP_PRODUCER_ERROR, false)
+  def getDropProducerErrors = getBoolean(TaskConfig.DROP_PRODUCER_ERRORS, false)
 
   def getPollIntervalMs = getOption(TaskConfig.POLL_INTERVAL_MS)
 
index 94bc138..03effe6 100644 (file)
@@ -425,15 +425,8 @@ object SamzaContainer extends Logging {
     val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, checkpointListeners, offsetManagerMetrics)
     info("Got offset manager: %s" format offsetManager)
 
-    val dropDeserializationError = config.getDropDeserialization match {
-      case Some(dropError) => dropError.toBoolean
-      case _ => false
-    }
-
-    val dropSerializationError = config.getDropSerialization match {
-      case Some(dropError) => dropError.toBoolean
-      case _ => false
-    }
+    val dropDeserializationError = config.getDropDeserializationErrors
+    val dropSerializationError = config.getDropSerializationErrors
 
     val pollIntervalMs = config
       .getPollIntervalMs
index f314f92..cca7a6b 100644 (file)
@@ -83,7 +83,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
       new ExponentialSleepStrategy(initialDelayMs = producerConfig.reconnectIntervalMs),
       getProducer,
       metrics,
-      dropProducerExceptions = config.getDropProducerError)
+      dropProducerExceptions = config.getDropProducerErrors)
   }
 
   def getAdmin(systemName: String, config: Config): SystemAdmin = {
index d588831..cf7c5e8 100644 (file)
@@ -101,7 +101,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
       new ExponentialSleepStrategy(initialDelayMs = producerConfig.reconnectIntervalMs),
       getProducer,
       metrics,
-      dropProducerExceptions = config.getDropProducerError)
+      dropProducerExceptions = config.getDropProducerErrors)
   }
 
   def getAdmin(systemName: String, config: Config): SystemAdmin = {
index 5278284..b4a97f7 100644 (file)
@@ -279,7 +279,7 @@ public class StreamAppender extends AppenderSkeleton {
       throw new SamzaException("can not read the config", e);
     }
     // Make system producer drop producer errors for StreamAppender
-    config = new MapConfig(config, ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERROR(), "true"));
+    config = new MapConfig(config, ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS(), "true"));
 
     return config;
   }
index e1d6dd3..28f759e 100644 (file)
@@ -300,7 +300,7 @@ public class StreamAppender extends AbstractAppender {
       throw new SamzaException("can not read the config", e);
     }
     // Make system producer drop producer errors for StreamAppender
-    config = new MapConfig(config, ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERROR(), "true"));
+    config = new MapConfig(config, ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS(), "true"));
 
     return config;
   }
index 7411318..78dad0d 100644 (file)
@@ -206,7 +206,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
         .put(JobConfig.JOB_NAME(), appName)
         .put(JobConfig.JOB_ID(), appId)
         .put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS)
-        .put(TaskConfig.DROP_PRODUCER_ERROR(), "true")
+        .put(TaskConfig.DROP_PRODUCER_ERRORS(), "true")
         .put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS)
         .put(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS(), "1000")
         .build();