124c85a567fb2ed0fb1819e815d68823944b4f79
[samza.git] / samza-kafka / src / main / scala / org / apache / samza / config / KafkaConfig.scala
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied.  See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19
20 package org.apache.samza.config
21
22
23 import java.util
24 import java.util.concurrent.TimeUnit
25 import java.util.regex.Pattern
26 import java.util.{Properties, UUID}
27
28 import com.google.common.collect.ImmutableMap
29 import kafka.consumer.ConsumerConfig
30 import org.apache.kafka.clients.producer.ProducerConfig
31 import org.apache.kafka.common.serialization.ByteArraySerializer
32 import org.apache.samza.SamzaException
33 import org.apache.samza.config.ApplicationConfig.ApplicationMode
34 import org.apache.samza.config.SystemConfig.Config2System
35 import org.apache.samza.util.{Logging, Util}
36
37 import scala.collection.JavaConverters._
38
39 object KafkaConfig {
40   val TOPIC_REPLICATION_FACTOR = "replication.factor"
41   val TOPIC_DEFAULT_REPLICATION_FACTOR = "2"
42
43   val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex"
44   val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
45   val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
46
47   val SEGMENT_BYTES = "segment.bytes"
48
49   val CHECKPOINT_SYSTEM = "task.checkpoint.system"
50   val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint." + TOPIC_REPLICATION_FACTOR
51   val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint." + SEGMENT_BYTES
52
53   val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog." + TOPIC_REPLICATION_FACTOR
54   val DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR = CHANGELOG_STREAM_REPLICATION_FACTOR format "default"
55   val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka."
56   // The default segment size to use for changelog topics
57   val CHANGELOG_DEFAULT_SEGMENT_SIZE = "536870912"
58
59   // Helper regular expression definitions to extract/match configurations
60   val CHANGELOG_STREAM_NAMES_REGEX = "stores\\.(.*)\\.changelog$"
61
62   val JOB_COORDINATOR_REPLICATION_FACTOR = "job.coordinator." + TOPIC_REPLICATION_FACTOR
63   val JOB_COORDINATOR_SEGMENT_BYTES = "job.coordinator." + SEGMENT_BYTES
64
65   /**
66     * Defines how low a queue can get for a single system/stream/partition
67     * combination before trying to fetch more messages for it.
68     */
69   val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold"
70
71   val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400
72
73   /**
74     * Defines how many bytes to use for the buffered prefetch messages for job as a whole.
75     * The bytes for a single system/stream/partition are computed based on this.
76     * This fetches wholes messages, hence this bytes limit is a soft one, and the actual usage can be
77     * the bytes limit + size of max message in the partition for a given stream.
78     * If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config.
79     */
80   val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes"
81
82   val DEFAULT_RETENTION_MS_FOR_BATCH = TimeUnit.DAYS.toMillis(1)
83
84   implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
85 }
86
87 class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
88   /**
89     * Gets the System to use for reading/writing checkpoints. Uses the following precedence.
90     *
91     * 1. If task.checkpoint.system is defined, that value is used.
92     * 2. If job.default.system is defined, that value is used.
93     * 3. None
94     */
95   def getCheckpointSystem = Option(getOrElse(KafkaConfig.CHECKPOINT_SYSTEM, new JobConfig(config).getDefaultSystem.orNull))
96
97   /**
98     * Gets the replication factor for the checkpoint topic. Uses the following precedence.
99     *
100     * 1. If task.checkpoint.replication.factor is configured, that value is used.
101     * 2. If systems.checkpoint-system.default.stream.replication.factor is configured, that value is used.
102     * 3. None
103     *
104     * Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]]
105     */
106   def getCheckpointReplicationFactor() = {
107     val defaultReplicationFactor: String = getSystemDefaultReplicationFactor(getCheckpointSystem.orNull, "3")
108     val replicationFactor = getOrDefault(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR, defaultReplicationFactor)
109
110     Option(replicationFactor)
111   }
112
113   private def getSystemDefaultReplicationFactor(systemName: String, defaultValue: String) = {
114     val defaultReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, defaultValue)
115     defaultReplicationFactor
116   }
117
118   /**
119     * Gets the segment bytes for the checkpoint topic. Uses the following precedence.
120     *
121     * 1. If task.checkpoint.segment.bytes is configured, that value is used.
122     * 2. If systems.checkpoint-system.default.stream.segment.bytes is configured, that value is used.
123     * 3. None
124     *
125     * Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]]
126     */
127   def getCheckpointSegmentBytes() = {
128     val defaultsegBytes = new JavaSystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
129     getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, defaultsegBytes)
130   }
131
132   /**
133     * Gets the replication factor for the coordinator topic. Uses the following precedence.
134     *
135     * 1. If job.coordinator.replication.factor is configured, that value is used.
136     * 2. If systems.coordinator-system.default.stream.replication.factor is configured, that value is used.
137     * 3. 3
138     *
139     * Note that the coordinator-system has a similar precedence. See [[JobConfig.getCoordinatorSystemName]]
140     */
141   def getCoordinatorReplicationFactor = getOption(KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR) match {
142     case Some(rplFactor) => rplFactor
143     case _ =>
144       val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
145       val systemReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, "3")
146       systemReplicationFactor
147   }
148
149   /**
150     * Gets the segment bytes for the coordinator topic. Uses the following precedence.
151     *
152     * 1. If job.coordinator.segment.bytes is configured, that value is used.
153     * 2. If systems.coordinator-system.default.stream.segment.bytes is configured, that value is used.
154     * 3. None
155     *
156     * Note that the coordinator-system has a similar precedence. See [[JobConfig.getCoordinatorSystemName]]
157     */
158   def getCoordinatorSegmentBytes = getOption(KafkaConfig.JOB_COORDINATOR_SEGMENT_BYTES) match {
159     case Some(segBytes) => segBytes
160     case _ =>
161       val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
162       val segBytes = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.SEGMENT_BYTES, "26214400")
163       segBytes
164   }
165
166   // custom consumer config
167   def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
168
169   def getConsumerFetchThresholdBytes(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES format name)
170
171   def isConsumerFetchThresholdBytesEnabled(name: String): Boolean = getConsumerFetchThresholdBytes(name).getOrElse("-1").toLong > 0
172
173   /**
174     * Returns a map of topic -> fetch.message.max.bytes value for all streams that
175     * are defined with this property in the config.
176     */
177   def getFetchMessageMaxBytesTopics(systemName: String) = {
178     val subConf = config.subset("systems.%s.streams." format systemName, true)
179     subConf
180       .asScala
181       .filterKeys(k => k.endsWith(".consumer.fetch.message.max.bytes"))
182       .map {
183         case (fetchMessageMaxBytes, fetchSizeValue) =>
184           (fetchMessageMaxBytes.replace(".consumer.fetch.message.max.bytes", ""), fetchSizeValue.toInt)
185       }.toMap
186   }
187
188   /**
189     * Returns a map of topic -> auto.offset.reset value for all streams that
190     * are defined with this property in the config.
191     */
192   def getAutoOffsetResetTopics(systemName: String) = {
193     val subConf = config.subset("systems.%s.streams." format systemName, true)
194     subConf
195       .asScala
196       .filterKeys(k => k.endsWith(".consumer.auto.offset.reset"))
197       .map {
198         case (topicAutoOffsetReset, resetValue) =>
199           (topicAutoOffsetReset.replace(".consumer.auto.offset.reset", ""), resetValue)
200       }.toMap
201   }
202
203   // regex resolver
204   def getRegexResolvedStreams(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_STREAMS format rewriterName)
205
206   def getRegexResolvedSystem(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
207
208   def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true)
209
210   /**
211     * Gets the replication factor for the changelog topics. Uses the following precedence.
212     *
213     * 1. If stores.myStore.changelog.replication.factor is configured, that value is used.
214     * 2. If systems.changelog-system.default.stream.replication.factor is configured, that value is used.
215     * 3. 2
216     *
217     * Note that the changelog-system has a similar precedence. See [[JavaStorageConfig]]
218     */
219   def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name).getOrElse(getDefaultChangelogStreamReplicationFactor)
220
221   def getDefaultChangelogStreamReplicationFactor() = {
222     val changelogSystem =  new JavaStorageConfig(config).getChangelogSystem()
223     getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR).getOrElse(getSystemDefaultReplicationFactor(changelogSystem, "2"))
224   }
225
226   // The method returns a map of storenames to changelog topic names, which are configured to use kafka as the changelog stream
227   def getKafkaChangelogEnabledStores() = {
228     val changelogConfigs = config.regexSubset(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX).asScala
229     var storeToChangelog = Map[String, String]()
230     val storageConfig = new StorageConfig(config)
231     val pattern = Pattern.compile(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX)
232
233     for ((changelogConfig, cn) <- changelogConfigs) {
234       // Lookup the factory for this particular stream and verify if it's a kafka system
235
236       val matcher = pattern.matcher(changelogConfig)
237       val storeName = if (matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + cn)
238
239       val changelogName = storageConfig.getChangelogStream(storeName).getOrElse(throw new SamzaException("unable to get SystemStream for store:" + changelogConfig));
240       val systemStream = Util.getSystemStreamFromNames(changelogName)
241       val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem))
242       storeToChangelog += storeName -> systemStream.getStream
243     }
244     storeToChangelog
245   }
246
247   // Get all kafka properties for changelog stream topic creation
248   def getChangelogKafkaProperties(name: String) = {
249     val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true)
250     val kafkaChangeLogProperties = new Properties
251
252     val appConfig = new ApplicationConfig(config)
253     // SAMZA-1600: do not use the combination of "compact,delete" as cleanup policy until we pick up Kafka broker 0.11.0.3,
254     // 1.0.2, or 1.1.0 (see KAFKA-6568)
255     // if (appConfig.getAppMode == ApplicationMode.STREAM) {
256     //  kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
257     // } else{
258     //  kafkaChangeLogProperties.setProperty("cleanup.policy", "compact,delete")
259     //  kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
260     // }
261     kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
262     kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
263     kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name)))
264     filteredConfigs.asScala.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) }
265     kafkaChangeLogProperties
266   }
267
268   // Set the checkpoint topic configs to have a very small segment size and
269   // enable log compaction. This keeps job startup time small since there
270   // are fewer useless (overwritten) messages to read from the checkpoint
271   // topic.
272   def getCheckpointTopicProperties() = {
273     val segmentBytes: Int = getCheckpointSegmentBytes()
274     val appConfig = new ApplicationConfig(config)
275     val isStreamMode = appConfig.getAppMode == ApplicationMode.STREAM
276     val properties = new Properties()
277
278     if (isStreamMode) {
279       properties.putAll(ImmutableMap.of(
280         "cleanup.policy", "compact",
281         "segment.bytes", String.valueOf(segmentBytes)))
282     } else {
283       properties.putAll(ImmutableMap.of(
284         "cleanup.policy", "compact,delete",
285         "retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH),
286         "segment.bytes", String.valueOf(segmentBytes)))
287     }
288     properties
289   }
290
291   // kafka config
292   def getKafkaSystemConsumerConfig( systemName: String,
293                                     clientId: String,
294                                     groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString,
295                                     injectedProps: Map[String, String] = Map()) = {
296
297     val subConf = config.subset("systems.%s.consumer." format systemName, true)
298     val consumerProps = new Properties()
299     consumerProps.putAll(subConf)
300     consumerProps.put("group.id", groupId)
301     consumerProps.put("client.id", clientId)
302     consumerProps.putAll(injectedProps.asJava)
303     new ConsumerConfig(consumerProps)
304   }
305
306   def getKafkaSystemProducerConfig( systemName: String,
307                                     clientId: String,
308                                     injectedProps: Map[String, String] = Map()) = {
309
310     val subConf = config.subset("systems.%s.producer." format systemName, true)
311     val producerProps = new util.HashMap[String, String]()
312     producerProps.putAll(subConf)
313     producerProps.put("client.id", clientId)
314     producerProps.putAll(injectedProps.asJava)
315     new KafkaProducerConfig(systemName, clientId, producerProps)
316   }
317 }
318
319 class KafkaProducerConfig(val systemName: String,
320                           val clientId: String = "",
321                           properties: java.util.Map[String, String] = new util.HashMap[String, String]()) extends Logging {
322
323   // Copied from new Kafka API - Workaround until KAFKA-1794 is resolved
324   val RECONNECT_BACKOFF_MS_DEFAULT = 10L
325
326   //Overrides specific to samza-kafka (these are considered as defaults in Samza & can be overridden by user
327   val MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT: java.lang.Integer = 1.asInstanceOf[Integer]
328   val RETRIES_DEFAULT: java.lang.Integer = Integer.MAX_VALUE
329   val LINGER_MS_DEFAULT: java.lang.Integer = 10
330
331   def getProducerProperties = {
332
333     val byteArraySerializerClassName = classOf[ByteArraySerializer].getCanonicalName
334     val producerProperties: java.util.Map[String, Object] = new util.HashMap[String, Object]()
335     producerProperties.putAll(properties)
336
337     if (!producerProperties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
338       debug("%s undefined. Defaulting to %s." format(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName))
339       producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName)
340     }
341
342     if (!producerProperties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
343       debug("%s undefined. Defaulting to %s." format(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName))
344       producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName)
345     }
346
347     if (producerProperties.containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)
348       && producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION).asInstanceOf[String].toInt > MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT) {
349       warn("Setting '%s' to a value other than %d does not guarantee message ordering because new messages will be sent without waiting for previous ones to be acknowledged."
350         format(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT))
351     } else {
352       producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT)
353     }
354
355     if (!producerProperties.containsKey(ProducerConfig.RETRIES_CONFIG)) {
356       debug("%s undefined. Defaulting to %s." format(ProducerConfig.RETRIES_CONFIG, RETRIES_DEFAULT))
357       producerProperties.put(ProducerConfig.RETRIES_CONFIG, RETRIES_DEFAULT)
358     }
359     producerProperties.get(ProducerConfig.RETRIES_CONFIG).toString.toInt // Verify int
360
361     if (!producerProperties.containsKey(ProducerConfig.LINGER_MS_CONFIG)) {
362       debug("%s undefined. Defaulting to %s." format(ProducerConfig.LINGER_MS_CONFIG, LINGER_MS_DEFAULT))
363       producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, LINGER_MS_DEFAULT)
364     }
365     producerProperties.get(ProducerConfig.LINGER_MS_CONFIG).toString.toInt // Verify int
366
367     producerProperties
368   }
369
370   val reconnectIntervalMs = Option(properties.get(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG))
371     .getOrElse(RECONNECT_BACKOFF_MS_DEFAULT).asInstanceOf[Long]
372
373   val bootsrapServers = {
374     if (properties.containsKey("metadata.broker.list"))
375       warn("Kafka producer configuration contains 'metadata.broker.list'. This configuration is deprecated . Samza has been upgraded " +
376         "to use Kafka's new producer API. Please update your configurations based on the documentation at http://kafka.apache.org/documentation.html#newproducerconfigs")
377     Option(properties.get("bootstrap.servers"))
378       .getOrElse(throw new SamzaException("No bootstrap servers defined in config for %s." format systemName))
379       .asInstanceOf[String]
380   }
381 }