SAMZA-1387: Unable to Start Samza App Because Regex Check
[samza.git] / samza-kafka / src / main / scala / org / apache / samza / system / kafka / KafkaSystemAdmin.scala
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied.  See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19
20 package org.apache.samza.system.kafka
21
22 import java.util
23 import java.util.{Properties, UUID}
24
25 import kafka.admin.AdminUtils
26 import kafka.api._
27 import kafka.common.TopicAndPartition
28 import kafka.consumer.{ConsumerConfig, SimpleConsumer}
29 import kafka.utils.ZkUtils
30 import org.apache.kafka.common.errors.TopicExistsException
31 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
32 import org.apache.samza.system._
33 import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, Logging}
34 import org.apache.samza.{Partition, SamzaException}
35
36 import scala.collection.JavaConverters._
37
38
39 object KafkaSystemAdmin extends Logging {
40   // Use a dummy string for the stream id. The physical name and partition count are all that matter for changelog creation, so the dummy string should not be used.
41   // We cannot use the topic name, as it may include special chars which are not allowed in stream IDs. See SAMZA-1317 and 1387
42   val CHANGELOG_STREAMID = "unused-temp-changelog-stream-id"
43   val COORDINATOR_STREAMID = "unused-temp-coordinator-stream-id"
44
45   /**
46    * A helper method that takes oldest, newest, and upcoming offsets for each
47    * system stream partition, and creates a single map from stream name to
48    * SystemStreamMetadata.
49    */
50   def assembleMetadata(oldestOffsets: Map[SystemStreamPartition, String], newestOffsets: Map[SystemStreamPartition, String], upcomingOffsets: Map[SystemStreamPartition, String]): Map[String, SystemStreamMetadata] = {
51     val allMetadata = (oldestOffsets.keySet ++ newestOffsets.keySet ++ upcomingOffsets.keySet)
52       .groupBy(_.getStream)
53       .map {
54         case (streamName, systemStreamPartitions) =>
55           val streamPartitionMetadata = systemStreamPartitions
56             .map(systemStreamPartition => {
57               val partitionMetadata = new SystemStreamPartitionMetadata(
58                 // If the topic/partition is empty then oldest and newest will
59                 // be stripped of their offsets, so default to null.
60                 oldestOffsets.getOrElse(systemStreamPartition, null),
61                 newestOffsets.getOrElse(systemStreamPartition, null),
62                 upcomingOffsets(systemStreamPartition))
63               (systemStreamPartition.getPartition, partitionMetadata)
64             })
65             .toMap
66           val streamMetadata = new SystemStreamMetadata(streamName, streamPartitionMetadata.asJava)
67           (streamName, streamMetadata)
68       }
69       .toMap
70
71     // This is typically printed downstream and it can be spammy, so debug level here.
72     debug("Got metadata: %s" format allMetadata)
73
74     allMetadata
75   }
76 }
77
78 /**
79  * A helper class that is used to construct the changelog stream specific information
80  *
81  * @param replicationFactor The number of replicas for the changelog stream
82  * @param kafkaProps The kafka specific properties that need to be used for changelog stream creation
83  */
84 case class ChangelogInfo(var replicationFactor: Int, var kafkaProps: Properties)
85
86 /**
87  * A Kafka-based implementation of SystemAdmin.
88  */
89 class KafkaSystemAdmin(
90   /**
91    * The system name to use when creating SystemStreamPartitions to return in
92    * the getSystemStreamMetadata responser.
93    */
94   systemName: String,
95
96   // TODO whenever Kafka decides to make the Set[Broker] class public, let's switch to Set[Broker] here.
97   /**
98    * List of brokers that are part of the Kafka system that we wish to
99    * interact with. The format is host1:port1,host2:port2.
100    */
101   brokerListString: String,
102
103   /**
104    * A method that returns a ZkUtils for the Kafka system. This is invoked
105    * when the system admin is attempting to create a coordinator stream.
106    */
107   connectZk: () => ZkUtils,
108
109   /**
110    * Custom properties to use when the system admin tries to create a new
111    * coordinator stream.
112    */
113   coordinatorStreamProperties: Properties = new Properties,
114
115   /**
116    * The replication factor to use when the system admin creates a new
117    * coordinator stream.
118    */
119   coordinatorStreamReplicationFactor: Int = 1,
120
121   /**
122    * The timeout to use for the simple consumer when fetching metadata from
123    * Kafka. Equivalent to Kafka's socket.timeout.ms configuration.
124    */
125   timeout: Int = Int.MaxValue,
126
127   /**
128    * The buffer size to use for the simple consumer when fetching metadata
129    * from Kafka. Equivalent to Kafka's socket.receive.buffer.bytes
130    * configuration.
131    */
132   bufferSize: Int = ConsumerConfig.SocketBufferSize,
133
134   /**
135    * The client ID to use for the simple consumer when fetching metadata from
136    * Kafka. Equivalent to Kafka's client.id configuration.
137    */
138   clientId: String = UUID.randomUUID.toString,
139
140   /**
141    * Replication factor for the Changelog topic in kafka
142    * Kafka properties to be used during the Changelog topic creation
143    */
144   topicMetaInformation: Map[String, ChangelogInfo] = Map[String, ChangelogInfo]()) extends ExtendedSystemAdmin with Logging {
145
146   import KafkaSystemAdmin._
147
148   override def getSystemStreamPartitionCounts(streams: util.Set[String], cacheTTL: Long): util.Map[String, SystemStreamMetadata] = {
149     getSystemStreamPartitionCounts(streams, new ExponentialSleepStrategy(initialDelayMs = 500), cacheTTL)
150   }
151
152   def getSystemStreamPartitionCounts(streams: util.Set[String], retryBackoff: ExponentialSleepStrategy, cacheTTL: Long = Long.MaxValue): util.Map[String, SystemStreamMetadata] = {
153     debug("Fetching system stream partition count for: %s" format streams)
154     var metadataTTL = cacheTTL
155     retryBackoff.run(
156       loop => {
157         val metadata = TopicMetadataCache.getTopicMetadata(
158           streams.asScala.toSet,
159           systemName,
160           getTopicMetadata,
161           metadataTTL)
162         val result = metadata.map {
163           case (topic, topicMetadata) => {
164             KafkaUtil.maybeThrowException(topicMetadata.errorCode)
165             val partitionsMap = topicMetadata.partitionsMetadata.map {
166               pm =>
167                 new Partition(pm.partitionId) -> new SystemStreamPartitionMetadata("", "", "")
168             }.toMap[Partition, SystemStreamPartitionMetadata]
169             (topic -> new SystemStreamMetadata(topic, partitionsMap.asJava))
170           }
171         }
172         loop.done
173         result.asJava
174       },
175
176       (exception, loop) => {
177         warn("Unable to fetch last offsets for streams %s due to %s. Retrying." format (streams, exception))
178         debug("Exception detail:", exception)
179         if (metadataTTL == Long.MaxValue) {
180           metadataTTL = 5000 // Revert to the default cache expiration
181         }
182       }
183     ).getOrElse(throw new SamzaException("Failed to get system stream metadata"))
184   }
185
186   /**
187    * Returns the offset for the message after the specified offset for each
188    * SystemStreamPartition that was passed in.
189    */
190
191   override def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = {
192     // This is safe to do with Kafka, even if a topic is key-deduped. If the
193     // offset doesn't exist on a compacted topic, Kafka will return the first
194     // message AFTER the offset that was specified in the fetch request.
195     offsets.asScala.mapValues(offset => (offset.toLong + 1).toString).asJava
196   }
197
198   override def getSystemStreamMetadata(streams: java.util.Set[String]) =
199     getSystemStreamMetadata(streams, new ExponentialSleepStrategy(initialDelayMs = 500)).asJava
200
201   /**
202    * Given a set of stream names (topics), fetch metadata from Kafka for each
203    * stream, and return a map from stream name to SystemStreamMetadata for
204    * each stream. This method will return null for oldest and newest offsets
205    * if a given SystemStreamPartition is empty. This method will block and
206    * retry indefinitely until it gets a successful response from Kafka.
207    */
208   def getSystemStreamMetadata(streams: java.util.Set[String], retryBackoff: ExponentialSleepStrategy) = {
209     debug("Fetching system stream metadata for: %s" format streams)
210     var metadataTTL = Long.MaxValue // Trust the cache until we get an exception
211     retryBackoff.run(
212       loop => {
213         val metadata = TopicMetadataCache.getTopicMetadata(
214           streams.asScala.toSet,
215           systemName,
216           getTopicMetadata,
217           metadataTTL)
218
219         debug("Got metadata for streams: %s" format metadata)
220
221         val brokersToTopicPartitions = getTopicsAndPartitionsByBroker(metadata)
222         var oldestOffsets = Map[SystemStreamPartition, String]()
223         var newestOffsets = Map[SystemStreamPartition, String]()
224         var upcomingOffsets = Map[SystemStreamPartition, String]()
225
226         // Get oldest, newest, and upcoming offsets for each topic and partition.
227         for ((broker, topicsAndPartitions) <- brokersToTopicPartitions) {
228           debug("Fetching offsets for %s:%s: %s" format (broker.host, broker.port, topicsAndPartitions))
229
230           val consumer = new SimpleConsumer(broker.host, broker.port, timeout, bufferSize, clientId)
231           try {
232             upcomingOffsets ++= getOffsets(consumer, topicsAndPartitions, OffsetRequest.LatestTime)
233             oldestOffsets ++= getOffsets(consumer, topicsAndPartitions, OffsetRequest.EarliestTime)
234
235             // Kafka's "latest" offset is always last message in stream's offset +
236             // 1, so get newest message in stream by subtracting one. this is safe
237             // even for key-deduplicated streams, since the last message will
238             // never be deduplicated.
239             newestOffsets = upcomingOffsets.mapValues(offset => (offset.toLong - 1).toString)
240             // Keep only oldest/newest offsets where there is a message. Should
241             // return null offsets for empty streams.
242             upcomingOffsets.foreach {
243               case (topicAndPartition, offset) =>
244                 if (offset.toLong <= 0) {
245                   debug("Stripping newest offsets for %s because the topic appears empty." format topicAndPartition)
246                   newestOffsets -= topicAndPartition
247                   debug("Setting oldest offset to 0 to consume from beginning")
248                   oldestOffsets += (topicAndPartition -> "0")
249                 }
250             }
251           } finally {
252             consumer.close
253           }
254         }
255
256         val result = assembleMetadata(oldestOffsets, newestOffsets, upcomingOffsets)
257         loop.done
258         result
259       },
260
261       (exception, loop) => {
262         warn("Unable to fetch last offsets for streams %s due to %s. Retrying." format (streams, exception))
263         debug("Exception detail:", exception)
264         metadataTTL = 5000 // Revert to the default cache expiration
265       }).getOrElse(throw new SamzaException("Failed to get system stream metadata"))
266   }
267
268   /**
269    * Returns the newest offset for the specified SSP.
270    * This method is fast and targeted. It minimizes the number of kafka requests.
271    * It does not retry indefinitely if there is any failure.
272    * It returns null if the topic is empty. To get the offsets for *all*
273    * partitions, it would be more efficient to call getSystemStreamMetadata
274    */
275   override def getNewestOffset(ssp: SystemStreamPartition, maxRetries: Integer) = {
276     debug("Fetching newest offset for: %s" format ssp)
277     var offset: String = null
278     var metadataTTL = Long.MaxValue // Trust the cache until we get an exception
279     var retries = maxRetries
280     new ExponentialSleepStrategy().run(
281       loop => {
282         val metadata = TopicMetadataCache.getTopicMetadata(
283           Set(ssp.getStream),
284           systemName,
285           getTopicMetadata,
286           metadataTTL)
287         debug("Got metadata for streams: %s" format metadata)
288
289         val brokersToTopicPartitions = getTopicsAndPartitionsByBroker(metadata)
290         val topicAndPartition = new TopicAndPartition(ssp.getStream, ssp.getPartition.getPartitionId)
291         val broker = brokersToTopicPartitions.filter((e) => e._2.contains(topicAndPartition)).head._1
292
293         // Get oldest, newest, and upcoming offsets for each topic and partition.
294         debug("Fetching offset for %s:%s: %s" format (broker.host, broker.port, topicAndPartition))
295         val consumer = new SimpleConsumer(broker.host, broker.port, timeout, bufferSize, clientId)
296         try {
297           offset = getOffsets(consumer, Set(topicAndPartition), OffsetRequest.LatestTime).head._2
298
299           // Kafka's "latest" offset is always last message in stream's offset +
300           // 1, so get newest message in stream by subtracting one. this is safe
301           // even for key-deduplicated streams, since the last message will
302           // never be deduplicated.
303           if (offset.toLong <= 0) {
304             debug("Stripping newest offsets for %s because the topic appears empty." format topicAndPartition)
305             offset = null
306           } else {
307             offset = (offset.toLong - 1).toString
308           }
309         } finally {
310           consumer.close
311         }
312
313         debug("Got offset %s for %s." format(offset, ssp))
314         loop.done
315       },
316
317       (exception, loop) => {
318         if (retries > 0) {
319           warn("Exception while trying to get offset for %s: %s. Retrying." format(ssp, exception))
320           metadataTTL = 0L // Force metadata refresh
321           retries -= 1
322         } else {
323           warn("Exception while trying to get offset for %s" format(ssp), exception)
324           loop.done
325           throw exception
326         }
327       })
328
329      offset
330   }
331
332   override def createCoordinatorStream(streamName: String) {
333     info("Attempting to create coordinator stream %s." format streamName)
334
335     val streamSpec = new KafkaStreamSpec(COORDINATOR_STREAMID, streamName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties)
336
337     if (createStream(streamSpec)) {
338       info("Created coordinator stream %s." format streamName)
339     } else {
340       info("Coordinator stream %s already exists." format streamName)
341     }
342   }
343
344   /**
345    * Helper method to use topic metadata cache when fetching metadata, so we
346    * don't hammer Kafka more than we need to.
347    */
348   protected def getTopicMetadata(topics: Set[String]) = {
349     new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
350       .getTopicInfo(topics)
351   }
352
353   /**
354    * Break topic metadata topic/partitions into per-broker map so that we can
355    * execute only one offset request per broker.
356    */
357   private def getTopicsAndPartitionsByBroker(metadata: Map[String, TopicMetadata]) = {
358     val brokersToTopicPartitions = metadata
359       .values
360       // Convert the topic metadata to a Seq[(Broker, TopicAndPartition)]
361       .flatMap(topicMetadata => {
362         KafkaUtil.maybeThrowException(topicMetadata.errorCode)
363         topicMetadata
364           .partitionsMetadata
365           // Convert Seq[PartitionMetadata] to Seq[(Broker, TopicAndPartition)]
366           .map(partitionMetadata => {
367             val topicAndPartition = new TopicAndPartition(topicMetadata.topic, partitionMetadata.partitionId)
368             val leader = partitionMetadata
369               .leader
370               .getOrElse(throw new SamzaException("Need leaders for all partitions when fetching offsets. No leader available for TopicAndPartition: %s" format topicAndPartition))
371             (leader, topicAndPartition)
372           })
373       })
374
375       // Convert to a Map[Broker, Seq[(Broker, TopicAndPartition)]]
376       .groupBy(_._1)
377       // Convert to a Map[Broker, Set[TopicAndPartition]]
378       .mapValues(_.map(_._2).toSet)
379
380     debug("Got topic partition data for brokers: %s" format brokersToTopicPartitions)
381
382     brokersToTopicPartitions
383   }
384
385   /**
386    * Use a SimpleConsumer to fetch either the earliest or latest offset from
387    * Kafka for each topic/partition in the topicsAndPartitions set. It is
388    * assumed that all topics/partitions supplied reside on the broker that the
389    * consumer is connected to.
390    */
391   private def getOffsets(consumer: SimpleConsumer, topicsAndPartitions: Set[TopicAndPartition], earliestOrLatest: Long) = {
392     debug("Getting offsets for %s using earliest/latest value of %s." format (topicsAndPartitions, earliestOrLatest))
393
394     var offsets = Map[SystemStreamPartition, String]()
395     val partitionOffsetInfo = topicsAndPartitions
396       .map(topicAndPartition => (topicAndPartition, PartitionOffsetRequestInfo(earliestOrLatest, 1)))
397       .toMap
398     val brokerOffsets = consumer
399       .getOffsetsBefore(new OffsetRequest(partitionOffsetInfo))
400       .partitionErrorAndOffsets
401       .mapValues(partitionErrorAndOffset => {
402         KafkaUtil.maybeThrowException(partitionErrorAndOffset.error)
403         partitionErrorAndOffset.offsets.head
404       })
405
406     for ((topicAndPartition, offset) <- brokerOffsets) {
407       offsets += new SystemStreamPartition(systemName, topicAndPartition.topic, new Partition(topicAndPartition.partition)) -> offset.toString
408     }
409
410     debug("Got offsets for %s using earliest/latest value of %s: %s" format (topicsAndPartitions, earliestOrLatest, offsets))
411
412     offsets
413   }
414
415   /**
416    * @inheritdoc
417    */
418   override def createStream(spec: StreamSpec): Boolean = {
419     val kSpec = KafkaStreamSpec.fromSpec(spec);
420     var streamCreated = false
421
422     new ExponentialSleepStrategy(initialDelayMs = 500).run(
423       loop => {
424         val zkClient = connectZk()
425         try {
426           AdminUtils.createTopic(
427             zkClient,
428             kSpec.getPhysicalName,
429             kSpec.getPartitionCount,
430             kSpec.getReplicationFactor,
431             kSpec.getProperties)
432         } finally {
433           zkClient.close
434         }
435
436         streamCreated = true
437         loop.done
438       },
439
440       (exception, loop) => {
441         exception match {
442           case e: TopicExistsException =>
443             streamCreated = false
444             loop.done
445           case e: Exception =>
446             warn("Failed to create topic %s: %s. Retrying." format (spec.getPhysicalName, e))
447             debug("Exception detail:", e)
448         }
449       })
450
451     streamCreated
452   }
453
454   /**
455     * @inheritdoc
456     *
457     * Validates a stream in Kafka. Should not be called before createStream(),
458     * since ClientUtils.fetchTopicMetadata(), used by different Kafka clients,
459     * is not read-only and will auto-create a new topic.
460     */
461   override def validateStream(spec: StreamSpec): Unit = {
462     val topicName = spec.getPhysicalName
463     info("Validating topic %s." format topicName)
464
465     val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
466     var metadataTTL = Long.MaxValue // Trust the cache until we get an exception
467     retryBackoff.run(
468       loop => {
469         val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
470         val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo, metadataTTL)
471         val topicMetadata = topicMetadataMap(topicName)
472         KafkaUtil.maybeThrowException(topicMetadata.errorCode)
473
474         val partitionCount = topicMetadata.partitionsMetadata.length
475         if (partitionCount != spec.getPartitionCount) {
476           throw new StreamValidationException("Topic validation failed for topic %s because partition count %s did not match expected partition count of %d" format (topicName, topicMetadata.partitionsMetadata.length, spec.getPartitionCount))
477         }
478
479         info("Successfully validated topic %s." format topicName)
480         loop.done
481       },
482
483       (exception, loop) => {
484         exception match {
485           case e: StreamValidationException => throw e
486           case e: Exception =>
487             warn("While trying to validate topic %s: %s. Retrying." format (topicName, e))
488             debug("Exception detail:", e)
489             metadataTTL = 5000L // Revert to the default value
490         }
491       })
492   }
493
494   /**
495     * Exception to be thrown when the change log stream creation or validation has failed
496     */
497   class KafkaChangelogException(s: String, t: Throwable) extends SamzaException(s, t) {
498     def this(s: String) = this(s, null)
499   }
500
501   override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {
502     val topicMeta = topicMetaInformation.getOrElse(topicName, throw new KafkaChangelogException("Unable to find topic information for topic " + topicName))
503     val spec = new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions, topicMeta.replicationFactor, topicMeta.kafkaProps)
504
505     if (createStream(spec)) {
506       info("Created changelog stream %s." format topicName)
507     } else {
508       info("Changelog stream %s already exists." format topicName)
509     }
510
511     validateStream(spec)
512   }
513
514   /**
515     * Validates a stream in Kafka. Should not be called before createStream(),
516     * since ClientUtils.fetchTopicMetadata(), used by different Kafka clients, is not read-only and
517     * will auto-create a new topic.
518     */
519   override def validateChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {
520     validateStream(new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions))
521   }
522
523   /**
524    * Compare the two offsets. Returns x where x < 0 if offset1 < offset2;
525    * x == 0 if offset1 == offset2; x > 0 if offset1 > offset2.
526    *
527    * Currently it's used in the context of the broadcast streams to detect
528    * the mismatch between two streams when consuming the broadcast streams.
529    */
530   override def offsetComparator(offset1: String, offset2: String) = {
531     offset1.toLong compare offset2.toLong
532   }
533 }