SAMZA-1978: Use samza offset reset value in kafka consumer
authorBoris S <bshkolnik@linkedin.com>
Sat, 10 Nov 2018 00:23:05 +0000 (16:23 -0800)
committerBoris S <bshkolnik@linkedin.com>
Sat, 10 Nov 2018 00:23:05 +0000 (16:23 -0800)
Author: Boris S <bshkolnik@linkedin.com>
Author: Boris S <boryas@apache.org>
Author: Boris Shkolnik <bshkolni@linkedin.com>

Reviewers: Prateek Maheshwari <pmaheshwari@apache.org>

Closes #753 from sborya/UseSamazResetInKafka

samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java

index 96c2c4d..fde98c6 100644 (file)
@@ -40,6 +40,9 @@ public class JavaSystemConfig extends MapConfig {
   private static final String SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT = SYSTEM_PREFIX + "%s" + ".default.stream.";
   private static final String EMPTY = "";
 
+  public static final String SAMZA_SYSTEM_OFFSET_UPCOMING = "upcoming";
+  public static final String SAMZA_SYSTEM_OFFSET_OLDEST = "oldest";
+
   public JavaSystemConfig(Config config) {
     super(config);
   }
@@ -122,4 +125,26 @@ public class JavaSystemConfig extends MapConfig {
   public Config getDefaultStreamProperties(String systemName) {
     return subset(String.format(SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, systemName), true);
   }
+
+  /**
+   * Get system offset default value.
+   * systems.'system'.default.stream.samza.offset.default is the config.
+   * systems.'system'.samza.offset.default is the deprecated setting, but needs to be checked for backward compatibility.
+   * @param systemName get config value for this system.
+   * @return value of system reset or default ("upcoming") if none set.
+   */
+  public String getSystemOffsetDefault(String systemName) {
+    // first check stream system default
+    String systemOffsetDefault = get(String.format("systems.%s.default.stream.samza.offset.default", systemName));
+
+    // if not set, check the deprecated setting
+    if (StringUtils.isBlank(systemOffsetDefault)) {
+      systemOffsetDefault = get(String.format("systems.%s.samza.offset.default", systemName));
+      if (StringUtils.isBlank(systemOffsetDefault)) {
+        return SAMZA_SYSTEM_OFFSET_UPCOMING;
+      }
+    }
+
+    return systemOffsetDefault;
+  }
 }
index 53d5e98..18d2b93 100644 (file)
@@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemAdmins
 import org.apache.samza.SamzaException
-import org.apache.samza.config.Config
+import org.apache.samza.config.{Config, JavaSystemConfig}
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.container.TaskName
@@ -86,11 +86,11 @@ object OffsetManager extends Logging {
         case (systemStream, systemStreamMetadata) =>
           // Get default offset.
           val streamDefaultOffset = config.getDefaultStreamOffset(systemStream)
-          val systemDefaultOffset = config.getDefaultSystemOffset(systemStream.getSystem)
+          val systemDefaultOffset = new JavaSystemConfig(config).getSystemOffsetDefault(systemStream.getSystem)
           val defaultOffsetType = if (streamDefaultOffset.isDefined) {
             OffsetType.valueOf(streamDefaultOffset.get.toUpperCase)
-          } else if (systemDefaultOffset.isDefined) {
-            OffsetType.valueOf(systemDefaultOffset.get.toUpperCase)
+          } else if (systemDefaultOffset != null) {
+            OffsetType.valueOf(systemDefaultOffset.toUpperCase)
           } else {
             info("No default offset for %s defined. Using upcoming." format systemStream)
             OffsetType.UPCOMING
index 00e65a7..fd508c2 100644 (file)
@@ -48,8 +48,6 @@ class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
 
   def getSystemMsgSerde(name: String) = getSystemDefaultStreamProperty(name, StreamConfig.MSG_SERDE)
 
-  def getDefaultSystemOffset(systemName: String) = getOption(SystemConfig.CONSUMER_OFFSET_DEFAULT format (systemName))
-
   def deleteCommittedMessages(systemName: String) = getBoolean(SystemConfig.DELETE_COMMITTED_MESSAGES format (systemName), false)
 
   /**
index 1e62d94..71a902c 100644 (file)
@@ -81,9 +81,13 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
     // Disable consumer auto-commit because Samza controls commits
     consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
+    // check if samza default offset value is defined
+    String systemOffsetDefault = new JavaSystemConfig(config).getSystemOffsetDefault(systemName);
+
     // Translate samza config value to kafka config value
-    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
-        getAutoOffsetResetValue((String) consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)));
+    String autoOffsetReset = getAutoOffsetResetValue((String) consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), systemOffsetDefault);
+    LOG.info("setting auto.offset.reset for system {} to {}", systemName, autoOffsetReset);
+    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
 
     // if consumer bootstrap servers are not configured, get them from the producer configs
     if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
@@ -165,44 +169,64 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
   }
 
   /**
-   * If settings for auto.reset in samza are different from settings in Kafka (auto.offset.reset),
-   * then need to convert them (see kafka.apache.org/documentation):
+   * If settings for Kafka Consumer auto.offset.reset is set - use it.
+   * If this setting is using the old (deprecated values) - translate them:
    * "largest" -> "latest"
    * "smallest" -> "earliest"
    *
-   * If no setting specified we return "latest" (same as Kafka).
-   * @param autoOffsetReset value from the app provided config
+   * If no setting specified - match it to the Samza default offset setting.
+   * If none defined - return "latest"
+   * @param autoOffsetReset consumer.auto.offset.reset config
+   * @param samzaOffsetDefault  samza system default
    * @return String representing the config value for "auto.offset.reset" property
    */
-  static String getAutoOffsetResetValue(final String autoOffsetReset) {
-    final String SAMZA_OFFSET_LARGEST = "largest";
-    final String SAMZA_OFFSET_SMALLEST = "smallest";
+  static String getAutoOffsetResetValue(final String autoOffsetReset, final String samzaOffsetDefault) {
+    // valid kafka consumer values
     final String KAFKA_OFFSET_LATEST = "latest";
     final String KAFKA_OFFSET_EARLIEST = "earliest";
     final String KAFKA_OFFSET_NONE = "none";
 
-    if (autoOffsetReset == null) {
-      return KAFKA_OFFSET_LATEST; // return default
-    }
+    // if the value for KafkaConsumer is set - use it.
+    if (!StringUtils.isBlank(autoOffsetReset)) {
+      if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || autoOffsetReset.equals(KAFKA_OFFSET_LATEST)
+          || autoOffsetReset.equals(KAFKA_OFFSET_NONE)) {
+        return autoOffsetReset;
+      }
+      // translate old kafka consumer values into new ones (SAMZA-1987 top remove it)
+      String newAutoOffsetReset = null;
+      switch (autoOffsetReset) {
+        case "largest":
+          newAutoOffsetReset = KAFKA_OFFSET_LATEST;
+          LOG.warn("Using old (deprecated) value for kafka consumer config auto.offset.reset = {}. The right value should be {}", autoOffsetReset, KAFKA_OFFSET_LATEST);
+          break;
+        case "smallest":
+          newAutoOffsetReset = KAFKA_OFFSET_EARLIEST;
+          LOG.warn("Using old (deprecated) value for kafka consumer config auto.offset.reset = {}. The right value should be {}", autoOffsetReset, KAFKA_OFFSET_EARLIEST);
+          break;
+        default:
+          throw new SamzaException("Using invalid value for kafka consumer config auto.offset.reset " + autoOffsetReset + ". See KafkaConsumer config for the correct values.");
+      }
 
-    // accept kafka values directly
-    if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || autoOffsetReset.equals(KAFKA_OFFSET_LATEST)
-        || autoOffsetReset.equals(KAFKA_OFFSET_NONE)) {
-      return autoOffsetReset;
+      LOG.info("Auto offset reset value converted from {} to {}", autoOffsetReset, newAutoOffsetReset);
+      return newAutoOffsetReset;
     }
 
-    String newAutoOffsetReset;
-    switch (autoOffsetReset) {
-      case SAMZA_OFFSET_LARGEST:
-        newAutoOffsetReset = KAFKA_OFFSET_LATEST;
-        break;
-      case SAMZA_OFFSET_SMALLEST:
-        newAutoOffsetReset = KAFKA_OFFSET_EARLIEST;
-        break;
-      default:
-        newAutoOffsetReset = KAFKA_OFFSET_LATEST;
+    // in case kafka consumer configs are not provided we should match them to Samza's ones.
+    String newAutoOffsetReset = KAFKA_OFFSET_LATEST;
+    if (!StringUtils.isBlank(samzaOffsetDefault)) {
+      switch (samzaOffsetDefault) {
+        case JavaSystemConfig.SAMZA_SYSTEM_OFFSET_UPCOMING:
+          newAutoOffsetReset = KAFKA_OFFSET_LATEST;
+          break;
+        case JavaSystemConfig.SAMZA_SYSTEM_OFFSET_OLDEST:
+          newAutoOffsetReset = KAFKA_OFFSET_EARLIEST;
+          break;
+        default:
+          throw new SamzaException("Using invalid value for samza default offset config " + autoOffsetReset + ". See samza config for the correct values");
+      }
+      LOG.info("Auto offset reset value for KafkaConsumer for system {} converted from {}(samza) to {}", samzaOffsetDefault, newAutoOffsetReset);
     }
-    LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset, newAutoOffsetReset);
+
     return newAutoOffsetReset;
   }
 }
\ No newline at end of file
index b5f283a..acef057 100644 (file)
@@ -305,6 +305,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
 
     // check if the proxy is running
     if (!proxy.isRunning()) {
+      LOG.info("{}: KafkaConsumerProxy is not running. Stopping the consumer.", this);
       stop();
       String message = String.format("%s: KafkaConsumerProxy has stopped.", this);
       throw new SamzaException(message, proxy.getFailureCause());
index 62f6269..128f9ba 100644 (file)
@@ -165,4 +165,118 @@ public class TestKafkaConsumerConfig {
 
     Assert.fail("didn't get exception for the missing config:" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
   }
+
+  @Test
+  public void testResetValues() {
+    Map<String, String> props = new HashMap<>();
+    props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "locahost:9092");
+    props.put(JobConfig.JOB_NAME(), JOB_NAME);
+
+
+    // largest -> latest
+    props.put(String.format("systems.%s.consumer.%s", SYSTEM_NAME, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "largest");
+
+    Config config = new MapConfig(props);
+    KafkaConsumerConfig kafkaConsumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, "client1");
+
+    Assert.assertEquals("latest", kafkaConsumerConfig.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+
+
+    // smallest -> earliest
+    props.put(String.format("systems.%s.consumer.%s", SYSTEM_NAME, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "smallest");
+
+    kafkaConsumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(new MapConfig(props), SYSTEM_NAME, "client1");
+
+    Assert.assertEquals("earliest", kafkaConsumerConfig.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+
+    // earliest -> earliest
+    props.put(String.format("systems.%s.consumer.%s", SYSTEM_NAME, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
+
+    kafkaConsumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(new MapConfig(props), SYSTEM_NAME, "client1");
+
+    Assert.assertEquals("earliest", kafkaConsumerConfig.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+
+    // none -> none
+    props.put(String.format("systems.%s.consumer.%s", SYSTEM_NAME, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "none");
+
+    kafkaConsumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(new MapConfig(props), SYSTEM_NAME, "client1");
+
+    Assert.assertEquals("none", kafkaConsumerConfig.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+
+
+    // someval -> latest
+    props.put(String.format("systems.%s.consumer.%s", SYSTEM_NAME, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "someval");
+
+    try {
+      kafkaConsumerConfig =
+          KafkaConsumerConfig.getKafkaSystemConsumerConfig(new MapConfig(props), SYSTEM_NAME, "client1");
+      Assert.fail("Should've failed for invalid value for default offset reset");
+    } catch (Exception e) {
+      // expected
+    }
+
+    // no value -> latest
+    props.remove(String.format("systems.%s.consumer.%s", SYSTEM_NAME, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+
+    kafkaConsumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(new MapConfig(props), SYSTEM_NAME, "client1");
+
+    Assert.assertEquals("latest", kafkaConsumerConfig.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+
+
+    // if samza system has a reset value - use it (override kafka
+    // upcoming -> latest
+    props.put(String.format("systems.%s.consumer.%s", SYSTEM_NAME, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
+    props.put(String.format("systems.%s.samza.offset.default", SYSTEM_NAME), "upcoming");
+
+    kafkaConsumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(new MapConfig(props), SYSTEM_NAME, "client1");
+
+    Assert.assertEquals("earliest", kafkaConsumerConfig.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+
+    // stream default should override it
+    props.remove(String.format("systems.%s.consumer.%s", SYSTEM_NAME, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+    props.put(String.format("systems.%s.default.stream.samza.offset.default", SYSTEM_NAME), "oldest");
+
+    kafkaConsumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(new MapConfig(props), SYSTEM_NAME, "client1");
+
+    Assert.assertEquals("earliest", kafkaConsumerConfig.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+  }
+
+  @Test
+  public void testKafkaAutoResetValue() {
+    Assert.assertEquals("latest",
+        KafkaConsumerConfig.getAutoOffsetResetValue("latest", "oldest"));
+
+    try {
+      KafkaConsumerConfig.getAutoOffsetResetValue("someValue", "oldest");
+      Assert.fail("Invalid value should've triggered an exception");
+    } catch (Exception e) {
+      // expected
+    }
+    Assert.assertEquals("earliest",
+        KafkaConsumerConfig.getAutoOffsetResetValue("earliest", "upcoming"));
+    Assert.assertEquals("none",
+        KafkaConsumerConfig.getAutoOffsetResetValue("none", "oldest"));
+    Assert.assertEquals("latest",
+        KafkaConsumerConfig.getAutoOffsetResetValue("largest", "oldest"));
+    Assert.assertEquals("earliest",
+        KafkaConsumerConfig.getAutoOffsetResetValue("smallest", "upcoming"));
+
+    Assert.assertEquals("earliest",
+        KafkaConsumerConfig.getAutoOffsetResetValue("", "oldest"));
+    Assert.assertEquals("latest",
+        KafkaConsumerConfig.getAutoOffsetResetValue("", "upcoming"));
+    try {
+      KafkaConsumerConfig.getAutoOffsetResetValue("", "whatever");
+      Assert.fail("Invalid value should've triggered an exception");
+    } catch (Exception e) {
+      //expected
+    }
+  }
 }