y
authorBoris S <boryas@apache.org>
Wed, 31 Oct 2018 21:22:04 +0000 (14:22 -0700)
committerBoris S <bshkolnik@linkedin.com>
Wed, 31 Oct 2018 21:22:04 +0000 (14:22 -0700)
Author: Boris S <boryas@apache.org>
Author: Boris S <bshkolnik@linkedin.com>
Author: Boris Shkolnik <bshkolni@linkedin.com>

Reviewers: Ray Matharu <rmatharu@linkedin.com>

Closes #779 from sborya/RemoveGetKafkaSystemConsumerConfig

samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
samza-kafka/src/main/scala/org/apache/samza/config_deprecated/KafkaConfig.scala [new file with mode: 0644]
samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala

index ad17e82..1e62d94 100644 (file)
@@ -43,6 +43,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
   public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class);
 
   public static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
+  private static final int FETCH_MAX_BYTES = 1024 * 1024;
 
   private final String systemName;
   /*
@@ -123,6 +124,19 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
     return clientId;
   }
 
+  public int fetchMessageMaxBytes() {
+    String fetchSize = (String)get("fetch.message.max.bytes");
+    if (StringUtils.isBlank(fetchSize)) {
+      return FETCH_MAX_BYTES;
+    } else  {
+      return Integer.valueOf(fetchSize);
+    }
+  }
+
+  public String getZkConnect() {
+    return (String) get(ZOOKEEPER_CONNECT);
+  }
+
   // group id should be unique per job
   static String createConsumerGroupId(Config config) {
     Pair<String, String> jobNameId = getJobNameAndId(config);
index f492518..1954ac7 100644 (file)
@@ -303,24 +303,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     properties
   }
 
-  /**
-    * @deprecated Use KafkaConsumerConfig
-    */
-  @Deprecated
-  def getKafkaSystemConsumerConfig( systemName: String,
-                                    clientId: String,
-                                    groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString,
-                                    injectedProps: Map[String, String] = Map()) = {
-
-    val subConf = config.subset("systems.%s.consumer." format systemName, true)
-    val consumerProps = new Properties()
-    consumerProps.putAll(subConf)
-    consumerProps.put("group.id", groupId)
-    consumerProps.put("client.id", clientId)
-    consumerProps.putAll(injectedProps.asJava)
-    new ConsumerConfig(consumerProps)
-  }
-
   def getKafkaSystemProducerConfig( systemName: String,
                                     clientId: String,
                                     injectedProps: Map[String, String] = Map()) = {
index a81ff13..e6068b0 100644 (file)
@@ -100,8 +100,8 @@ class RegExTopicGenerator extends ConfigRewriter with Logging {
     val systemName = config
       .getRegexResolvedSystem(rewriterName)
       .getOrElse(throw new SamzaException("No system defined in config for rewriter %s." format rewriterName))
-    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, "")
-    val zkConnect = Option(consumerConfig.zkConnect)
+    val consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, "")
+    val zkConnect = Option(consumerConfig.getZkConnect)
       .getOrElse(throw new SamzaException("No zookeeper.connect for system %s defined in config." format systemName))
     val zkClient = new ZkClient(zkConnect, 6000, 6000)
 
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config_deprecated/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config_deprecated/KafkaConfig.scala
new file mode 100644 (file)
index 0000000..02a6275
--- /dev/null
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config_deprecated
+
+
+import java.util
+import java.util.concurrent.TimeUnit
+import java.util.regex.Pattern
+import java.util.{Properties, UUID}
+
+import com.google.common.collect.ImmutableMap
+import kafka.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.apache.samza.SamzaException
+import org.apache.samza.config.ApplicationConfig.ApplicationMode
+import org.apache.samza.config._
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.util.{Logging, StreamUtil}
+
+import scala.collection.JavaConverters._
+
+object KafkaConfig {
+  val TOPIC_REPLICATION_FACTOR = "replication.factor"
+  val TOPIC_DEFAULT_REPLICATION_FACTOR = "2"
+
+  val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex"
+  val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
+  val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
+
+  val SEGMENT_BYTES = "segment.bytes"
+
+  val CHECKPOINT_SYSTEM = "task.checkpoint.system"
+  val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint." + TOPIC_REPLICATION_FACTOR
+  val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint." + SEGMENT_BYTES
+
+  val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog." + TOPIC_REPLICATION_FACTOR
+  val DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR = CHANGELOG_STREAM_REPLICATION_FACTOR format "default"
+  val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka."
+  // The default segment size to use for changelog topics
+  val CHANGELOG_DEFAULT_SEGMENT_SIZE = "536870912"
+
+  // Helper regular expression definitions to extract/match configurations
+  val CHANGELOG_STREAM_NAMES_REGEX = "stores\\.(.*)\\.changelog$"
+
+  val JOB_COORDINATOR_REPLICATION_FACTOR = "job.coordinator." + TOPIC_REPLICATION_FACTOR
+  val JOB_COORDINATOR_SEGMENT_BYTES = "job.coordinator." + SEGMENT_BYTES
+
+  val CONSUMER_CONFIGS_CONFIG_KEY = "systems.%s.consumer.%s"
+  val PRODUCER_BOOTSTRAP_SERVERS_CONFIG_KEY = "systems.%s.producer.bootstrap.servers"
+  val PRODUCER_CONFIGS_CONFIG_KEY = "systems.%s.producer.%s"
+  val CONSUMER_ZK_CONNECT_CONFIG_KEY = "systems.%s.consumer.zookeeper.connect"
+
+  /**
+    * Defines how low a queue can get for a single system/stream/partition
+    * combination before trying to fetch more messages for it.
+    */
+  val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold"
+
+  val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400
+
+  /**
+    * Defines how many bytes to use for the buffered prefetch messages for job as a whole.
+    * The bytes for a single system/stream/partition are computed based on this.
+    * This fetches wholes messages, hence this bytes limit is a soft one, and the actual usage can be
+    * the bytes limit + size of max message in the partition for a given stream.
+    * If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config.
+    */
+  val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes"
+
+  val DEFAULT_RETENTION_MS_FOR_BATCH = TimeUnit.DAYS.toMillis(1)
+
+  implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
+}
+
+class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
+  /**
+    * Gets the System to use for reading/writing checkpoints. Uses the following precedence.
+    *
+    * 1. If task.checkpoint.system is defined, that value is used.
+    * 2. If job.default.system is defined, that value is used.
+    * 3. None
+    */
+  def getCheckpointSystem = Option(getOrElse(KafkaConfig.CHECKPOINT_SYSTEM, new JobConfig(config).getDefaultSystem.orNull))
+
+  /**
+    * Gets the replication factor for the checkpoint topic. Uses the following precedence.
+    *
+    * 1. If task.checkpoint.replication.factor is configured, that value is used.
+    * 2. If systems.checkpoint-system.default.stream.replication.factor is configured, that value is used.
+    * 3. None
+    *
+    * Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]]
+    */
+  def getCheckpointReplicationFactor() = {
+    val defaultReplicationFactor: String = getSystemDefaultReplicationFactor(getCheckpointSystem.orNull, "3")
+    val replicationFactor = getOrDefault(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR, defaultReplicationFactor)
+
+    Option(replicationFactor)
+  }
+
+  private def getSystemDefaultReplicationFactor(systemName: String, defaultValue: String) = {
+    val defaultReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, defaultValue)
+    defaultReplicationFactor
+  }
+
+  /**
+    * Gets the segment bytes for the checkpoint topic. Uses the following precedence.
+    *
+    * 1. If task.checkpoint.segment.bytes is configured, that value is used.
+    * 2. If systems.checkpoint-system.default.stream.segment.bytes is configured, that value is used.
+    * 3. None
+    *
+    * Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]]
+    */
+  def getCheckpointSegmentBytes() = {
+    val defaultsegBytes = new JavaSystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
+    getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, defaultsegBytes)
+  }
+
+  /**
+    * Gets the replication factor for the coordinator topic. Uses the following precedence.
+    *
+    * 1. If job.coordinator.replication.factor is configured, that value is used.
+    * 2. If systems.coordinator-system.default.stream.replication.factor is configured, that value is used.
+    * 3. 3
+    *
+    * Note that the coordinator-system has a similar precedence. See [[JobConfig.getCoordinatorSystemName]]
+    */
+  def getCoordinatorReplicationFactor = getOption(KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR) match {
+    case Some(rplFactor) => rplFactor
+    case _ =>
+      val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
+      val systemReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, "3")
+      systemReplicationFactor
+  }
+
+  /**
+    * Gets the segment bytes for the coordinator topic. Uses the following precedence.
+    *
+    * 1. If job.coordinator.segment.bytes is configured, that value is used.
+    * 2. If systems.coordinator-system.default.stream.segment.bytes is configured, that value is used.
+    * 3. None
+    *
+    * Note that the coordinator-system has a similar precedence. See [[JobConfig.getCoordinatorSystemName]]
+    */
+  def getCoordinatorSegmentBytes = getOption(KafkaConfig.JOB_COORDINATOR_SEGMENT_BYTES) match {
+    case Some(segBytes) => segBytes
+    case _ =>
+      val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
+      val segBytes = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.SEGMENT_BYTES, "26214400")
+      segBytes
+  }
+
+  // custom consumer config
+  def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
+
+  def getConsumerFetchThresholdBytes(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES format name)
+
+  def isConsumerFetchThresholdBytesEnabled(name: String): Boolean = getConsumerFetchThresholdBytes(name).getOrElse("-1").toLong > 0
+
+  /**
+    * Returns a map of topic -> fetch.message.max.bytes value for all streams that
+    * are defined with this property in the config.
+    */
+  def getFetchMessageMaxBytesTopics(systemName: String) = {
+    val subConf = config.subset("systems.%s.streams." format systemName, true)
+    subConf
+      .asScala
+      .filterKeys(k => k.endsWith(".consumer.fetch.message.max.bytes"))
+      .map {
+        case (fetchMessageMaxBytes, fetchSizeValue) =>
+          (fetchMessageMaxBytes.replace(".consumer.fetch.message.max.bytes", ""), fetchSizeValue.toInt)
+      }.toMap
+  }
+
+  /**
+    * Returns a map of topic -> auto.offset.reset value for all streams that
+    * are defined with this property in the config.
+    */
+  def getAutoOffsetResetTopics(systemName: String) = {
+    val subConf = config.subset("systems.%s.streams." format systemName, true)
+    subConf
+      .asScala
+      .filterKeys(k => k.endsWith(".consumer.auto.offset.reset"))
+      .map {
+        case (topicAutoOffsetReset, resetValue) =>
+          (topicAutoOffsetReset.replace(".consumer.auto.offset.reset", ""), resetValue)
+      }.toMap
+  }
+
+  // regex resolver
+  def getRegexResolvedStreams(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_STREAMS format rewriterName)
+
+  def getRegexResolvedSystem(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
+
+  def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true)
+
+  /**
+    * Gets the replication factor for the changelog topics. Uses the following precedence.
+    *
+    * 1. If stores.myStore.changelog.replication.factor is configured, that value is used.
+    * 2. If systems.changelog-system.default.stream.replication.factor is configured, that value is used.
+    * 3. 2
+    *
+    * Note that the changelog-system has a similar precedence. See [[JavaStorageConfig]]
+    */
+  def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name).getOrElse(getDefaultChangelogStreamReplicationFactor)
+
+  def getDefaultChangelogStreamReplicationFactor() = {
+    val changelogSystem =  new JavaStorageConfig(config).getChangelogSystem()
+    getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR).getOrElse(getSystemDefaultReplicationFactor(changelogSystem, "2"))
+  }
+
+  // The method returns a map of storenames to changelog topic names, which are configured to use kafka as the changelog stream
+  def getKafkaChangelogEnabledStores() = {
+    val changelogConfigs = config.regexSubset(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX).asScala
+    var storeToChangelog = Map[String, String]()
+    val storageConfig = new StorageConfig(config)
+    val pattern = Pattern.compile(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX)
+
+    for ((changelogConfig, cn) <- changelogConfigs) {
+      // Lookup the factory for this particular stream and verify if it's a kafka system
+
+      val matcher = pattern.matcher(changelogConfig)
+      val storeName = if (matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + cn)
+
+      storageConfig.getChangelogStream(storeName).foreach(changelogName => {
+        val systemStream = StreamUtil.getSystemStreamFromNames(changelogName)
+        val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem))
+        storeToChangelog += storeName -> systemStream.getStream
+      })
+    }
+    storeToChangelog
+  }
+
+  // Get all kafka properties for changelog stream topic creation
+  def getChangelogKafkaProperties(name: String) = {
+    val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true)
+    val kafkaChangeLogProperties = new Properties
+
+    val appConfig = new ApplicationConfig(config)
+    // SAMZA-1600: do not use the combination of "compact,delete" as cleanup policy until we pick up Kafka broker 0.11.0.57,
+    // 1.0.2, or 1.1.0 (see KAFKA-6568)
+
+    // Adjust changelog topic setting, when TTL is set on a RocksDB store
+    //  - Disable log compaction on Kafka changelog topic
+    //  - Set topic TTL to be the same as RocksDB TTL
+    Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match {
+      case Some(rocksDbTtl) =>
+        if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) {
+          kafkaChangeLogProperties.setProperty("cleanup.policy", "delete")
+          if (!config.containsKey("stores.%s.changelog.kafka.retention.ms" format name)) {
+            kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(rocksDbTtl))
+          }
+        }
+      case _ =>
+        kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
+    }
+
+    kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
+    kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name)))
+    filteredConfigs.asScala.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) }
+    kafkaChangeLogProperties
+  }
+
+  // Set the checkpoint topic configs to have a very small segment size and
+  // enable log compaction. This keeps job startup time small since there
+  // are fewer useless (overwritten) messages to read from the checkpoint
+  // topic.
+  def getCheckpointTopicProperties() = {
+    val segmentBytes: Int = getCheckpointSegmentBytes()
+    val appConfig = new ApplicationConfig(config)
+    val isStreamMode = appConfig.getAppMode == ApplicationMode.STREAM
+    val properties = new Properties()
+
+    if (isStreamMode) {
+      properties.putAll(ImmutableMap.of(
+        "cleanup.policy", "compact",
+        "segment.bytes", String.valueOf(segmentBytes)))
+    } else {
+      properties.putAll(ImmutableMap.of(
+        "cleanup.policy", "compact,delete",
+        "retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH),
+        "segment.bytes", String.valueOf(segmentBytes)))
+    }
+    properties
+  }
+
+  /**
+    * @deprecated  Use KafkaConsumerConfig
+    */
+  @Deprecated
+  def getKafkaSystemConsumerConfig( systemName: String,
+                                    clientId: String,
+                                    groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString,
+                                    injectedProps: Map[String, String] = Map()) = {
+
+    val subConf = config.subset("systems.%s.consumer." format systemName, true)
+    val consumerProps = new Properties()
+    consumerProps.putAll(subConf)
+    consumerProps.put("group.id", groupId)
+    consumerProps.put("client.id", clientId)
+    consumerProps.putAll(injectedProps.asJava)
+    new ConsumerConfig(consumerProps)
+  }
+
+  def getKafkaSystemProducerConfig( systemName: String,
+                                    clientId: String,
+                                    injectedProps: Map[String, String] = Map()) = {
+
+    val subConf = config.subset("systems.%s.producer." format systemName, true)
+    val producerProps = new util.HashMap[String, String]()
+    producerProps.putAll(subConf)
+    producerProps.put("client.id", clientId)
+    producerProps.putAll(injectedProps.asJava)
+    new KafkaProducerConfig(systemName, clientId, producerProps)
+  }
+}
+
+class KafkaProducerConfig(val systemName: String,
+                          val clientId: String = "",
+                          properties: java.util.Map[String, String] = new util.HashMap[String, String]()) extends Logging {
+
+  // Copied from new Kafka API - Workaround until KAFKA-1794 is resolved
+  val RECONNECT_BACKOFF_MS_DEFAULT = 10L
+
+  //Overrides specific to samza-kafka (these are considered as defaults in Samza & can be overridden by user
+  val MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT: java.lang.Integer = 1.asInstanceOf[Integer]
+  val RETRIES_DEFAULT: java.lang.Integer = Integer.MAX_VALUE
+  val LINGER_MS_DEFAULT: java.lang.Integer = 10
+
+  def getProducerProperties = {
+
+    val byteArraySerializerClassName = classOf[ByteArraySerializer].getCanonicalName
+    val producerProperties: java.util.Map[String, Object] = new util.HashMap[String, Object]()
+    producerProperties.putAll(properties)
+
+    if (!producerProperties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+      debug("%s undefined. Defaulting to %s." format(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName))
+      producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName)
+    }
+
+    if (!producerProperties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+      debug("%s undefined. Defaulting to %s." format(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName))
+      producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName)
+    }
+
+    if (producerProperties.containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)
+      && producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION).asInstanceOf[String].toInt > MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT) {
+      warn("Setting '%s' to a value other than %d does not guarantee message ordering because new messages will be sent without waiting for previous ones to be acknowledged."
+        format(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT))
+    } else {
+      producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT)
+    }
+
+    if (!producerProperties.containsKey(ProducerConfig.RETRIES_CONFIG)) {
+      debug("%s undefined. Defaulting to %s." format(ProducerConfig.RETRIES_CONFIG, RETRIES_DEFAULT))
+      producerProperties.put(ProducerConfig.RETRIES_CONFIG, RETRIES_DEFAULT)
+    }
+    producerProperties.get(ProducerConfig.RETRIES_CONFIG).toString.toInt // Verify int
+
+    if (!producerProperties.containsKey(ProducerConfig.LINGER_MS_CONFIG)) {
+      debug("%s undefined. Defaulting to %s." format(ProducerConfig.LINGER_MS_CONFIG, LINGER_MS_DEFAULT))
+      producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, LINGER_MS_DEFAULT)
+    }
+    producerProperties.get(ProducerConfig.LINGER_MS_CONFIG).toString.toInt // Verify int
+
+    producerProperties
+  }
+
+  val reconnectIntervalMs = Option(properties.get(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG))
+    .getOrElse(RECONNECT_BACKOFF_MS_DEFAULT).asInstanceOf[Long]
+
+  val bootsrapServers = {
+    if (properties.containsKey("metadata.broker.list"))
+      warn("Kafka producer configuration contains 'metadata.broker.list'. This configuration is deprecated . Samza has been upgraded " +
+        "to use Kafka's new producer API. Please update your configurations based on the documentation at http://kafka.apache.org/documentation.html#newproducerconfigs")
+    Option(properties.get("bootstrap.servers"))
+      .getOrElse(throw new SamzaException("No bootstrap servers defined in config for %s." format systemName))
+      .asInstanceOf[String]
+  }
+}
index eecdbe4..d588831 100644 (file)
@@ -47,13 +47,14 @@ object KafkaSystemFactory extends Logging {
 
 class KafkaSystemFactory extends SystemFactory with Logging {
   def getConsumer(systemName: String, config: Config, registry: MetricsRegistry): SystemConsumer = {
+    val kafkaConfig:org.apache.samza.config_deprecated.KafkaConfig = config
     val clientId = getClientId("samza-consumer", config)
     val metrics = new KafkaSystemConsumerMetrics(systemName, registry)
 
     // Kind of goofy to need a producer config for consumers, but we need metadata.
     val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
     val bootstrapServers = producerConfig.bootsrapServers
-    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
+    val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig(systemName, clientId)
 
     val timeout = consumerConfig.socketTimeoutMs
     val bufferSize = consumerConfig.socketReceiveBufferBytes
@@ -104,10 +105,11 @@ class KafkaSystemFactory extends SystemFactory with Logging {
   }
 
   def getAdmin(systemName: String, config: Config): SystemAdmin = {
+    val kafkaConfig:org.apache.samza.config_deprecated.KafkaConfig = config
     val clientId = getClientId("samza-admin", config)
     val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
     val bootstrapServers = producerConfig.bootsrapServers
-    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
+    val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig(systemName, clientId)
     val timeout = consumerConfig.socketTimeoutMs
     val bufferSize = consumerConfig.socketReceiveBufferBytes
     val zkConnect = Option(consumerConfig.zkConnect)
index b8467b8..1f402c8 100644 (file)
 package org.apache.samza.config
 
 import java.util.Properties
+
 import org.apache.samza.config.factories.PropertiesConfigFactory
 import org.junit.Assert._
 import org.junit.Test
+
 import scala.collection.JavaConverters._
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.kafka.clients.producer.ProducerConfig
@@ -41,56 +43,22 @@ class TestKafkaConfig {
     props = new Properties
     props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")
     props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/")
+    props.setProperty(JobConfig.JOB_NAME, "jobName")
   }
 
-  @Test
-  def testIdGeneration = {
-    val factory = new PropertiesConfigFactory()
-    props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory")
-
-    val mapConfig = new MapConfig(props.asScala.asJava)
-    val kafkaConfig = new KafkaConfig(mapConfig)
-
-    val consumerConfig1 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, "TestClientId1")
-    val consumerClientId1 = consumerConfig1.clientId
-    val groupId1 = consumerConfig1.groupId
-    val consumerConfig2 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, "TestClientId2")
-    val consumerClientId2 = consumerConfig2.clientId
-    val groupId2 = consumerConfig2.groupId
-    assert(consumerClientId1.equals("TestClientId1"))
-    assert(consumerClientId2.equals("TestClientId2"))
-    assert(groupId1.startsWith("undefined-samza-consumer-group-"))
-    assert(groupId2.startsWith("undefined-samza-consumer-group-"))
-    assert(consumerClientId1 != consumerClientId2)
-    assert(groupId1 != groupId2)
-
-    val consumerConfig3 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, TEST_CLIENT_ID, TEST_GROUP_ID)
-    val consumerClientId3 = consumerConfig3.clientId
-    val groupId3 = consumerConfig3.groupId
-    assert(consumerClientId3.equals(TEST_CLIENT_ID))
-    assert(groupId3.equals(TEST_GROUP_ID))
-
-    val producerConfig1 = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, "TestClientId1")
-    val producerClientId1 = producerConfig1.clientId
-    val producerConfig2 = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, "TestClientId2")
-    val producerClientId2 = producerConfig2.clientId
-
-    assert(producerClientId1.equals("TestClientId1"))
-    assert(producerClientId2.equals("TestClientId2"))
-  }
 
   @Test
   def testStreamLevelFetchSizeOverride() {
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
-    val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
+    val consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(mapConfig, SYSTEM_NAME, TEST_CLIENT_ID)
     // default fetch size
     assertEquals(1024*1024, consumerConfig.fetchMessageMaxBytes)
 
     props.setProperty("systems." + SYSTEM_NAME + ".consumer.fetch.message.max.bytes", "262144")
     val mapConfig1 = new MapConfig(props.asScala.asJava)
     val kafkaConfig1 = new KafkaConfig(mapConfig1)
-    val consumerConfig1 = kafkaConfig1.getKafkaSystemConsumerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
+    val consumerConfig1 = KafkaConsumerConfig.getKafkaSystemConsumerConfig(mapConfig1, SYSTEM_NAME, TEST_CLIENT_ID)
     // shared fetch size
     assertEquals(512*512, consumerConfig1.fetchMessageMaxBytes)