SAMZA-1888: Kafka consumer improvements
authorBoris S <bshkolnik@linkedin.com>
Wed, 17 Oct 2018 23:38:19 +0000 (16:38 -0700)
committerBoris S <bshkolnik@linkedin.com>
Wed, 17 Oct 2018 23:38:19 +0000 (16:38 -0700)
Author: Boris S <bshkolnik@linkedin.com>
Author: Boris S <boryas@apache.org>
Author: Boris Shkolnik <bshkolni@linkedin.com>

Reviewers: bharathkk <codin.martial@gmail.com>

Closes #738 from sborya/KafkaConsumerImprovements

samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala [deleted file]
samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala [deleted file]
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala [deleted file]

index f761ab3..d2ceafb 100644 (file)
@@ -80,9 +80,9 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin {
 
   protected final String systemName;
   protected final Consumer metadataConsumer;
+  protected final Config  config;
 
-  // get ZkUtils object to connect to Kafka's ZK.
-  private final Supplier<ZkUtils> getZkConnection;
+  protected AdminClient adminClient = null;
 
   // Custom properties to create a new coordinator stream.
   private final Properties coordinatorStreamProperties;
@@ -96,16 +96,14 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin {
   // Kafka properties for intermediate topics creation
   private final Map<String, Properties> intermediateStreamProperties;
 
-  // adminClient is required for deleteCommittedMessages operation
-  private final AdminClient adminClient;
-
   // used for intermediate streams
-  private final boolean deleteCommittedMessages;
+  protected final boolean deleteCommittedMessages;
 
   private final AtomicBoolean stopped = new AtomicBoolean(false);
 
   public KafkaSystemAdmin(String systemName, Config config, Consumer metadataConsumer) {
     this.systemName = systemName;
+    this.config = config;
 
     if (metadataConsumer == null) {
       throw new SamzaException(
@@ -113,35 +111,6 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin {
     }
     this.metadataConsumer = metadataConsumer;
 
-    // populate brokerList from either consumer or producer configs
-    Properties props = new Properties();
-    String brokerList = config.get(
-        String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-    if (brokerList == null) {
-      brokerList = config.get(String.format(KafkaConfig.PRODUCER_CONFIGS_CONFIG_KEY(), systemName,
-          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-    }
-    if (brokerList == null) {
-      throw new SamzaException(
-          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " is required for systemAdmin for system " + systemName);
-    }
-    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
-
-    // kafka.admin.AdminUtils requires zkConnect
-    // this will change after we move to the new org.apache..AdminClient
-    String zkConnect =
-        config.get(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ZOOKEEPER_CONNECT));
-    if (StringUtils.isBlank(zkConnect)) {
-      throw new SamzaException("Missing zookeeper.connect config for admin for system " + systemName);
-    }
-    props.put(ZOOKEEPER_CONNECT, zkConnect);
-
-    adminClient = AdminClient.create(props);
-
-    getZkConnection = () -> {
-      return ZkUtils.apply(zkConnect, 6000, 6000, false);
-    };
-
     KafkaConfig kafkaConfig = new KafkaConfig(config);
     coordinatorStreamReplicationFactor = Integer.valueOf(kafkaConfig.getCoordinatorReplicationFactor());
     coordinatorStreamProperties = KafkaSystemAdminUtilsScala.getCoordinatorTopicProperties(kafkaConfig);
@@ -197,6 +166,8 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin {
       } catch (Exception e) {
         LOG.warn("metadataConsumer.close for system " + systemName + " failed with exception.", e);
       }
+    }
+    if (adminClient != null) {
       try {
         adminClient.close();
       } catch (Exception e) {
@@ -546,14 +517,14 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin {
   public boolean createStream(StreamSpec streamSpec) {
     LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());
 
-    return KafkaSystemAdminUtilsScala.createStream(toKafkaSpec(streamSpec), getZkConnection);
+    return KafkaSystemAdminUtilsScala.createStream(toKafkaSpec(streamSpec), getZkConnection());
   }
 
   @Override
   public boolean clearStream(StreamSpec streamSpec) {
     LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());
 
-    KafkaSystemAdminUtilsScala.clearStream(streamSpec, getZkConnection);
+    KafkaSystemAdminUtilsScala.clearStream(streamSpec, getZkConnection());
 
     Map<String, List<PartitionInfo>> topicsMetadata = getTopicMetadata(ImmutableSet.of(streamSpec.getPhysicalName()));
     return topicsMetadata.get(streamSpec.getPhysicalName()).isEmpty();
@@ -630,11 +601,56 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin {
   @Override
   public void deleteMessages(Map<SystemStreamPartition, String> offsets) {
     if (deleteCommittedMessages) {
+      if (adminClient == null) {
+        adminClient = AdminClient.create(createAdminClientProperties());
+      }
       KafkaSystemAdminUtilsScala.deleteMessages(adminClient, offsets);
       deleteMessageCalled = true;
     }
   }
 
+  protected Properties createAdminClientProperties() {
+    // populate brokerList from either consumer or producer configs
+    Properties props = new Properties();
+    // included SSL settings if needed
+
+    props.putAll(config.subset(String.format("systems.%s.consumer.", systemName), true));
+
+    //validate brokerList
+    String brokerList = config.get(
+        String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+    if (brokerList == null) {
+      brokerList = config.get(String.format(KafkaConfig.PRODUCER_CONFIGS_CONFIG_KEY(), systemName,
+          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+    }
+    if (brokerList == null) {
+      throw new SamzaException(
+          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " is required for systemAdmin for system " + systemName);
+    }
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+
+
+    // kafka.admin.AdminUtils requires zkConnect
+    // this will change after we move to the new org.apache..AdminClient
+    String zkConnect =
+        config.get(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ZOOKEEPER_CONNECT));
+    if (StringUtils.isBlank(zkConnect)) {
+      throw new SamzaException("Missing zookeeper.connect config for admin for system " + systemName);
+    }
+    props.put(ZOOKEEPER_CONNECT, zkConnect);
+
+    return props;
+  }
+
+  private Supplier<ZkUtils> getZkConnection() {
+    String zkConnect =
+        config.get(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ZOOKEEPER_CONNECT));
+    if (StringUtils.isBlank(zkConnect)) {
+      throw new SamzaException("Missing zookeeper.connect config for admin for system " + systemName);
+    }
+    return () -> ZkUtils.apply(zkConnect, 6000, 6000, false);
+  }
+
   /**
    * Container for metadata about offsets.
    */
index 65d0e42..b5f283a 100644 (file)
@@ -77,8 +77,10 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
 
   /**
    * Create a KafkaSystemConsumer for the provided {@code systemName}
+   * @param kafkaConsumer kafka Consumer object to be used by this system consumer
    * @param systemName system name for which we create the consumer
    * @param config application config
+   * @param clientId clientId from the kafka consumer to be used in the KafkaConsumerProxy
    * @param metrics metrics for this KafkaSystemConsumer
    * @param clock system clock
    */
@@ -106,12 +108,13 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
 
   /**
    * Create internal kafka consumer object, which will be used in the Proxy.
+   * @param <K> key type for the consumer
+   * @param <V> value type for the consumer
    * @param systemName system name for which we create the consumer
    * @param kafkaConsumerConfig config object for Kafka's KafkaConsumer
-   * @return KafkaConsumer object
+   * @return KafkaConsumer newly created kafka consumer object
    */
-  public static <K,V> KafkaConsumer<K, V> createKafkaConsumerImpl(String systemName,
-      HashMap<String, Object> kafkaConsumerConfig) {
+  public static <K, V> KafkaConsumer<K, V> createKafkaConsumerImpl(String systemName, HashMap<String, Object> kafkaConsumerConfig) {
 
     LOG.info("Instantiating KafkaConsumer for systemName {} with properties {}", systemName, kafkaConsumerConfig);
     return new KafkaConsumer<>(kafkaConsumerConfig);
@@ -176,7 +179,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
         throw new SamzaException(msg, e);
       }
 
-      LOG.info("{}: Changing consumer's starting offset for tp = %s to %s", this, tp, startingOffsetString);
+      LOG.info("{}: Changing consumer's starting offset for tp = {} to {}", this, tp, startingOffsetString);
 
       // add the partition to the proxy
       proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset);
@@ -310,16 +313,10 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     return super.poll(systemStreamPartitions, timeout);
   }
 
-  /**
-   * convert from TopicPartition to TopicAndPartition
-   */
   public static TopicAndPartition toTopicAndPartition(TopicPartition tp) {
     return new TopicAndPartition(tp.topic(), tp.partition());
   }
 
-  /**
-   * convert to TopicPartition from SystemStreamPartition
-   */
   public static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
     return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
   }
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala
deleted file mode 100644 (file)
index 5b4886a..0000000
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.samza.system.kafka
-
-import kafka.consumer.SimpleConsumer
-import kafka.api._
-import kafka.common.TopicAndPartition
-import kafka.consumer.ConsumerConfig
-
-class DefaultFetchSimpleConsumer(host: scala.Predef.String, port: scala.Int, soTimeout: scala.Int, bufferSize: scala.Int,
-  clientId: scala.Predef.String, fetchSize: StreamFetchSizes = new StreamFetchSizes,
-  minBytes: Int = ConsumerConfig.MinFetchBytes, maxWait: Int = ConsumerConfig.MaxFetchWaitMs)
-  extends SimpleConsumer(host, port, soTimeout, bufferSize, clientId) {
-
-  def defaultFetch(fetches: (TopicAndPartition, Long)*) = {
-    val fbr = new FetchRequestBuilder().maxWait(maxWait)
-      .minBytes(minBytes)
-      .clientId(clientId)
-
-    fetches.foreach(f => fbr.addFetch(f._1.topic, f._1.partition, f._2, fetchSize.streamValue.getOrElse(f._1.topic, fetchSize.defaultValue)))
-
-    this.fetch(fbr.build())
-  }
-
-  override def close(): Unit = super.close()
-
-  override def send(request: TopicMetadataRequest): TopicMetadataResponse = super.send(request)
-
-  override def fetch(request: FetchRequest): FetchResponse = super.fetch(request)
-
-  override def getOffsetsBefore(request: OffsetRequest): OffsetResponse = super.getOffsetsBefore(request)
-
-  override def commitOffsets(request: OffsetCommitRequest): OffsetCommitResponse = super.commitOffsets(request)
-
-  override def fetchOffsets(request: OffsetFetchRequest): OffsetFetchResponse = super.fetchOffsets(request)
-
-  override def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = super.earliestOrLatestOffset(topicAndPartition, earliestOrLatest, consumerId)
-}
-
-/**
- * a simple class for holding values for the stream's fetch size (fetch.message.max.bytes).
- * The stream-level fetch size values are put in the streamValue map streamName -> fetchSize.
- * If stream-level fetch size is not defined, use the default value. The default value is the
- * Kafka's default fetch size value or the system-level fetch size value (if defined).
- */
-case class StreamFetchSizes(defaultValue: Int = ConsumerConfig.MaxFetchSize, streamValue: Map[String, Int] = Map[String, Int]())
-
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
deleted file mode 100644 (file)
index 55b4611..0000000
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.samza.system.kafka
-
-import org.apache.kafka.common.errors.OffsetOutOfRangeException
-import kafka.api._
-import kafka.common.TopicAndPartition
-import kafka.api.PartitionOffsetRequestInfo
-import org.apache.samza.util.Logging
-import org.apache.samza.util.KafkaUtil
-
-/**
- * GetOffset validates offsets for topic partitions, and manages fetching new
- * offsets for topics using Kafka's auto.offset.reset configuration.
- */
-class GetOffset(
-  /**
-   * The default auto.offset.reset to use if a topic is not overridden in
-   * autoOffsetResetTopics. Any value other than "earliest" or "latest" will
-   * result in an exception when getRestOffset is called.
-   */
-  default: String,
-
-  /**
-   * Topic-level overrides for auto.offset.reset. Any value other than
-   * "earliest" or "latest" will result in an exception when getRestOffset is
-   * called.
-   */
-  autoOffsetResetTopics: Map[String, String] = Map()) extends Logging with Toss {
-
-  /**
-   * Checks if an offset is valid for a given topic/partition. Validity is
-   * defined as an offset that returns a readable non-empty message set with
-   * no exceptions.
-   */
-  def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: TopicAndPartition, offset: String) = {
-    info("Validating offset %s for topic and partition %s" format (offset, topicAndPartition))
-
-    try {
-      val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))
-
-      if (messages.hasError) {
-        KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic, topicAndPartition.partition).exception())
-      }
-
-      info("Able to successfully read from offset %s for topic and partition %s. Using it to instantiate consumer." format (offset, topicAndPartition))
-
-      true
-    } catch {
-      case e: OffsetOutOfRangeException => false
-    }
-  }
-
-  /**
-   * Uses a topic's auto.offset.reset setting (defined via the
-   * autoOffsetResetTopics map in the constructor) to fetch either the
-   * earliest or latest offset. If neither earliest or latest is defined for
-   * the topic in question, the default supplied in the constructor will be
-   * used.
-   */
-  def getResetOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: TopicAndPartition) = {
-    val offsetRequest = new OffsetRequest(Map(topicAndPartition -> new PartitionOffsetRequestInfo(getAutoOffset(topicAndPartition.topic), 1)))
-    val offsetResponse = consumer.getOffsetsBefore(offsetRequest)
-    val partitionOffsetResponse = offsetResponse
-      .partitionErrorAndOffsets
-      .get(topicAndPartition)
-      .getOrElse(toss("Unable to find offset information for %s" format topicAndPartition))
-
-    KafkaUtil.maybeThrowException(partitionOffsetResponse.error.exception())
-
-    partitionOffsetResponse
-      .offsets
-      .headOption
-      .getOrElse(toss("Got response, but no offsets defined for %s" format topicAndPartition))
-  }
-
-  /**
-   * Returns either the earliest or latest setting (a Kafka constant) for a
-   * given topic using the autoOffsetResetTopics map defined in the
-   * constructor. If the topic is not defined in autoOffsetResetTopics, the
-   * default value supplied in the constructor will be used. This is used in
-   * conjunction with getResetOffset to fetch either the earliest or latest
-   * offset for a topic.
-   */
-  private def getAutoOffset(topic: String): Long = {
-    info("Checking if auto.offset.reset is defined for topic %s" format (topic))
-    autoOffsetResetTopics.getOrElse(topic, default) match {
-      case OffsetRequest.LargestTimeString =>
-        info("Got reset of type %s." format OffsetRequest.LargestTimeString)
-        OffsetRequest.LatestTime
-      case OffsetRequest.SmallestTimeString =>
-        info("Got reset of type %s." format OffsetRequest.SmallestTimeString)
-        OffsetRequest.EarliestTime
-      case other => toss("Can't get offset value for topic %s due to invalid value: %s" format (topic, other))
-    }
-  }
-}
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala
deleted file mode 100644 (file)
index ab82609..0000000
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.kafka
-
-import java.nio.ByteBuffer
-
-import kafka.api._
-import kafka.common.TopicAndPartition
-import kafka.consumer.SimpleConsumer
-import kafka.message.Message
-import kafka.message.ByteBufferMessageSet
-import org.apache.kafka.common.errors.OffsetOutOfRangeException
-import org.junit._
-import org.junit.Assert._
-import org.mockito.Mockito
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
-
-class TestGetOffset {
-
-  private val outOfRangeOffset : String = "0"
-
-  /**
-   * An empty message set is still a valid offset. It just means that the
-   * offset was for the upcoming message, which hasn't yet been written. The
-   * fetch request times out in such a case, and an empty message set is
-   * returned.
-   */
-  @Test
-  def testIsValidOffsetWorksWithEmptyMessageSet {
-    val getOffset = new GetOffset(OffsetRequest.LargestTimeString)
-    // Should not throw an exception.
-    assertTrue(getOffset.isValidOffset(getMockDefaultFetchSimpleConsumer, TopicAndPartition("foo", 1), "1234"))
-  }
-
-  /**
-    * An empty message set is still a valid offset. It just means that the
-    * offset was for the upcoming message, which hasn't yet been written. The
-    * fetch request times out in such a case, and an empty message set is
-    * returned.
-    */
-  @Test
-  def testIsValidOffsetWorksWithOffsetOutOfRangeException {
-    val getOffset = new GetOffset(OffsetRequest.LargestTimeString)
-    // Should not throw an exception.
-    assertFalse(getOffset.isValidOffset(getMockDefaultFetchSimpleConsumer, TopicAndPartition("foo", 1), outOfRangeOffset))
-  }
-
-  /**
-   * Create a default fetch simple consumer that returns empty message sets.
-   */
-  def getMockDefaultFetchSimpleConsumer = {
-    new DefaultFetchSimpleConsumer("", 0, 0, 0, "") {
-      val sc = Mockito.mock(classOf[SimpleConsumer])
-
-      // Build an empty fetch response.
-      val fetchResponse = {
-        val fetchResponse = Mockito.mock(classOf[FetchResponse])
-        val messageSet = {
-          val messageSet = Mockito.mock(classOf[ByteBufferMessageSet])
-          val messages = List()
-
-          def getMessage() = new Message(Mockito.mock(classOf[ByteBuffer]))
-
-          when(messageSet.sizeInBytes).thenReturn(0)
-          when(messageSet.size).thenReturn(0)
-          when(messageSet.iterator).thenReturn(messages.iterator)
-
-          messageSet
-        }
-        when(fetchResponse.messageSet(any(classOf[String]), any(classOf[Int]))).thenReturn(messageSet)
-
-        fetchResponse
-      }
-
-      doAnswer(new Answer[FetchResponse] {
-          override def answer(invocation: InvocationOnMock): FetchResponse = {
-            if (invocation.getArgumentAt(0, classOf[FetchRequest]).requestInfo.exists(
-              req => req._2.offset.toString.equals(outOfRangeOffset))) {
-              throw new OffsetOutOfRangeException("test exception")
-            }
-            fetchResponse
-          }
-        }).when(sc).fetch(any(classOf[FetchRequest]))
-
-      override def fetch(request: FetchRequest): FetchResponse = {
-        sc.fetch(request)
-      }
-    }
-  }
-}