SAMZA-1921: upgrade to use the latest java AdminClient.
authorBoris S <bshkolnik@linkedin.com>
Wed, 7 Nov 2018 22:39:59 +0000 (14:39 -0800)
committerBoris S <bshkolnik@linkedin.com>
Wed, 7 Nov 2018 22:39:59 +0000 (14:39 -0800)
In this PR, I've refactored create/clear streams methods.

Author: Boris S <bshkolnik@linkedin.com>
Author: Boris S <boryas@apache.org>
Author: Boris Shkolnik <bshkolni@linkedin.com>
Author: svenkata <svenkataraman@linkedin.com>

Reviewers: Shanthoosh Venkataraman <svenkataraman@linkedin.com>

Closes #789 from sborya/JavaAdminClient

samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdminUtilsScala.scala
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala

index 16a2e67..596b07a 100644 (file)
@@ -27,21 +27,29 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
-import kafka.admin.AdminClient;
 import kafka.utils.ZkUtils;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.SystemConfig;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.StreamValidationException;
@@ -73,6 +81,7 @@ public class KafkaSystemAdmin implements SystemAdmin {
   protected static final long DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS = 10000;
   protected static final int MAX_RETRIES_ON_EXCEPTION = 5;
   protected static final int DEFAULT_REPL_FACTOR = 2;
+  private static final int KAFKA_ADMIN_OPS_TIMEOUT_MS = 50000;
 
   // used in TestRepartitionJoinWindowApp TODO - remove SAMZA-1945
   @VisibleForTesting
@@ -80,9 +89,9 @@ public class KafkaSystemAdmin implements SystemAdmin {
 
   protected final String systemName;
   protected final Consumer metadataConsumer;
-  protected final Config  config;
+  protected final Config config;
 
-  protected AdminClient adminClient = null;
+  protected kafka.admin.AdminClient adminClientForDelete = null;
 
   // Custom properties to create a new coordinator stream.
   private final Properties coordinatorStreamProperties;
@@ -99,6 +108,9 @@ public class KafkaSystemAdmin implements SystemAdmin {
   // used for intermediate streams
   protected final boolean deleteCommittedMessages;
 
+  // admin client for create/remove topics
+  final AdminClient adminClient;
+
   private final AtomicBoolean stopped = new AtomicBoolean(false);
 
   public KafkaSystemAdmin(String systemName, Config config, Consumer metadataConsumer) {
@@ -111,6 +123,10 @@ public class KafkaSystemAdmin implements SystemAdmin {
     }
     this.metadataConsumer = metadataConsumer;
 
+    Properties props = createAdminClientProperties();
+    LOG.info("New admin client with props:" + props);
+    adminClient = AdminClient.create(props);
+
     KafkaConfig kafkaConfig = new KafkaConfig(config);
     coordinatorStreamReplicationFactor = Integer.valueOf(kafkaConfig.getCoordinatorReplicationFactor());
     coordinatorStreamProperties = KafkaSystemAdminUtilsScala.getCoordinatorTopicProperties(kafkaConfig);
@@ -167,13 +183,17 @@ public class KafkaSystemAdmin implements SystemAdmin {
         LOG.warn("metadataConsumer.close for system " + systemName + " failed with exception.", e);
       }
     }
-    if (adminClient != null) {
+    if (adminClientForDelete != null) {
       try {
-        adminClient.close();
+        adminClientForDelete.close();
       } catch (Exception e) {
-        LOG.warn("adminClient.close for system " + systemName + " failed with exception.", e);
+        LOG.warn("AdminClient.close() for system {} failed with exception {}.", systemName, e);
       }
     }
+
+    if (adminClient != null) {
+      adminClient.close();
+    }
   }
 
   /**
@@ -481,18 +501,62 @@ public class KafkaSystemAdmin implements SystemAdmin {
   @Override
   public boolean createStream(StreamSpec streamSpec) {
     LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());
+    final String REPL_FACTOR = "replication.factor";
+
+    KafkaStreamSpec kSpec = toKafkaSpec(streamSpec);
+    String topicName = kSpec.getPhysicalName();
+
+    // create topic.
+    NewTopic newTopic = new NewTopic(topicName, kSpec.getPartitionCount(), (short) kSpec.getReplicationFactor());
+
+    // specify the configs
+    Map<String, String> streamConfig = new HashMap(streamSpec.getConfig());
+    // HACK - replication.factor is invalid config for AdminClient.createTopics
+    if (streamConfig.containsKey(REPL_FACTOR)) {
+      String repl = streamConfig.get(REPL_FACTOR);
+      LOG.warn("Configuration {}={} for topic={} is invalid. Using kSpec repl factor {}",
+          REPL_FACTOR, repl, kSpec.getPhysicalName(), kSpec.getReplicationFactor());
+      streamConfig.remove(REPL_FACTOR);
+    }
+    newTopic.configs(new MapConfig(streamConfig));
+    CreateTopicsResult result = adminClient.createTopics(ImmutableSet.of(newTopic));
+    try {
+      result.all().get(KAFKA_ADMIN_OPS_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e instanceof TopicExistsException || e.getCause() instanceof TopicExistsException) {
+        LOG.info("Topic {} already exists.", topicName);
+        return false;
+      }
 
-    return KafkaSystemAdminUtilsScala.createStream(toKafkaSpec(streamSpec), getZkConnection());
+      throw new SamzaException(String.format("Creation of topic %s failed.", topicName), e);
+    }
+    LOG.info("Successfully created topic {}", topicName);
+    DescribeTopicsResult desc = adminClient.describeTopics(ImmutableSet.of(topicName));
+    try {
+      TopicDescription td = desc.all().get(KAFKA_ADMIN_OPS_TIMEOUT_MS, TimeUnit.MILLISECONDS).get(topicName);
+      LOG.info("Topic {} created with {}", topicName, td);
+      return true;
+    } catch (Exception e) {
+      LOG.error("'Describe after create' failed for topic " + topicName, e);
+      return false;
+    }
   }
 
   @Override
   public boolean clearStream(StreamSpec streamSpec) {
     LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());
 
-    KafkaSystemAdminUtilsScala.clearStream(streamSpec, getZkConnection());
+    String topicName = streamSpec.getPhysicalName();
 
-    Map<String, List<PartitionInfo>> topicsMetadata = getTopicMetadata(ImmutableSet.of(streamSpec.getPhysicalName()));
-    return topicsMetadata.get(streamSpec.getPhysicalName()).isEmpty();
+    try {
+      DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(ImmutableSet.of(topicName));
+      deleteTopicsResult.all().get(KAFKA_ADMIN_OPS_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      LOG.error("Failed to delete topic {} with exception {}.", topicName, e);
+      return false;
+    }
+
+    return true;
   }
 
   /**
@@ -566,10 +630,10 @@ public class KafkaSystemAdmin implements SystemAdmin {
   @Override
   public void deleteMessages(Map<SystemStreamPartition, String> offsets) {
     if (deleteCommittedMessages) {
-      if (adminClient == null) {
-        adminClient = AdminClient.create(createAdminClientProperties());
+      if (adminClientForDelete == null) {
+        adminClientForDelete = kafka.admin.AdminClient.create(createAdminClientProperties());
       }
-      KafkaSystemAdminUtilsScala.deleteMessages(adminClient, offsets);
+      KafkaSystemAdminUtilsScala.deleteMessages(adminClientForDelete, offsets);
       deleteMessageCalled = true;
     }
   }
@@ -594,7 +658,6 @@ public class KafkaSystemAdmin implements SystemAdmin {
     }
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
 
-
     // kafka.admin.AdminUtils requires zkConnect
     // this will change after we move to the new org.apache..AdminClient
     String zkConnect =
index 6ff2b50..89ba1bb 100644 (file)
@@ -22,21 +22,18 @@ package org.apache.samza.system.kafka
 import java.util
 import java.util.Properties
 
-import kafka.admin.{AdminClient, AdminUtils}
-import kafka.utils.{Logging, ZkUtils}
+import kafka.admin.AdminClient
+import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.TopicExistsException
 import org.apache.samza.config.ApplicationConfig.ApplicationMode
 import org.apache.samza.config.{ApplicationConfig, Config, KafkaConfig, StreamConfig}
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{StreamSpec, SystemStreamMetadata, SystemStreamPartition}
-import org.apache.samza.util.ExponentialSleepStrategy
-import org.slf4j.{Logger, LoggerFactory}
+import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition}
 
 import scala.collection.JavaConverters._
 
 /**
-  * A helper class that is used to construct the changelog stream specific information
+  * A helper class for KafkaSystemAdmin
   *
   * @param replicationFactor The number of replicas for the changelog stream
   * @param kafkaProps The kafka specific properties that need to be used for changelog stream creation
@@ -47,88 +44,6 @@ case class ChangelogInfo(var replicationFactor: Int, var kafkaProps: Properties)
 // TODO move to org.apache.kafka.clients.admin.AdminClien from the kafka.admin.AdminClient
 object KafkaSystemAdminUtilsScala extends Logging {
 
-  val CLEAR_STREAM_RETRIES = 3
-  val CREATE_STREAM_RETRIES = 10
-
-  /**
-    * @inheritdoc
-    *
-    * Delete a stream in Kafka. Deleting topics works only when the broker is configured with "delete.topic.enable=true".
-    * Otherwise it's a no-op.
-    */
-  def clearStream(spec: StreamSpec, connectZk: java.util.function.Supplier[ZkUtils]): Unit = {
-    info("Deleting topic %s for system %s" format(spec.getPhysicalName, spec.getSystemName))
-    val kSpec = KafkaStreamSpec.fromSpec(spec)
-    var retries = CLEAR_STREAM_RETRIES
-    new ExponentialSleepStrategy().run(
-      loop => {
-        val zkClient = connectZk.get()
-        try {
-          AdminUtils.deleteTopic(
-            zkClient,
-            kSpec.getPhysicalName)
-        } finally {
-          zkClient.close
-        }
-
-        loop.done
-      },
-
-      (exception, loop) => {
-        if (retries > 0) {
-          warn("Exception while trying to delete topic %s. Retrying." format (spec.getPhysicalName), exception)
-          retries -= 1
-        } else {
-          warn("Fail to delete topic %s." format (spec.getPhysicalName), exception)
-          loop.done
-          throw exception
-        }
-      })
-  }
-
-
-  def createStream(kSpec: KafkaStreamSpec, connectZk: java.util.function.Supplier[ZkUtils]): Boolean = {
-    info("Creating topic %s for system %s" format(kSpec.getPhysicalName, kSpec.getSystemName))
-    var streamCreated = false
-    var retries = CREATE_STREAM_RETRIES
-
-    new ExponentialSleepStrategy(initialDelayMs = 500).run(
-      loop => {
-        val zkClient = connectZk.get()
-        try {
-          AdminUtils.createTopic(
-            zkClient,
-            kSpec.getPhysicalName,
-            kSpec.getPartitionCount,
-            kSpec.getReplicationFactor,
-            kSpec.getProperties)
-        } finally {
-          zkClient.close
-        }
-
-        streamCreated = true
-        loop.done
-      },
-
-      (exception, loop) => {
-        exception match {
-          case e: TopicExistsException =>
-            streamCreated = false
-            loop.done
-          case e: Exception =>
-            if (retries > 0) {
-              warn("Failed to create topic %s. Retrying." format (kSpec.getPhysicalName), exception)
-              retries -= 1
-            } else {
-              error("Failed to create topic %s. Bailing out." format (kSpec.getPhysicalName), exception)
-              throw exception
-            }
-        }
-      })
-
-    streamCreated
-  }
-
   /**
     * A helper method that takes oldest, newest, and upcoming offsets for each
     * system stream partition, and creates a single map from stream name to
index 27601b0..1d305fb 100644 (file)
@@ -21,10 +21,9 @@ package org.apache.samza.system.kafka;
 
 import com.google.common.collect.ImmutableSet;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import kafka.api.TopicMetadata;
-import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.samza.Partition;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
@@ -221,10 +220,10 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
   @Test
   public void testCreateStream() {
     StreamSpec spec = new StreamSpec("testId", "testStream", "testSystem", 8);
-
+    KafkaSystemAdmin admin = systemAdmin();
     assertTrue("createStream should return true if the stream does not exist and then is created.",
-        systemAdmin().createStream(spec));
-    systemAdmin().validateStream(spec);
+        admin.createStream(spec));
+    admin.validateStream(spec);
 
     assertFalse("createStream should return false if the stream already exists.", systemAdmin().createStream(spec));
   }
@@ -259,16 +258,28 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
     systemAdmin().validateStream(spec2);
   }
 
-  //@Test //TODO - currently the connection to ZK fails, but since it checks for empty, the tests succeeds.  SAMZA-1887
+  @Test
   public void testClearStream() {
     StreamSpec spec = new StreamSpec("testId", "testStreamClear", "testSystem", 8);
 
-    assertTrue("createStream should return true if the stream does not exist and then is created.",
-        systemAdmin().createStream(spec));
-    assertTrue(systemAdmin().clearStream(spec));
+    KafkaSystemAdmin admin = systemAdmin();
+    String topicName = spec.getPhysicalName();
+
+    assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec));
+    // validate topic exists
+    assertTrue(admin.clearStream(spec));
 
-    ImmutableSet<String> topics = ImmutableSet.of(spec.getPhysicalName());
-    Map<String, List<PartitionInfo>> metadata = systemAdmin().getTopicMetadata(topics);
-    assertTrue(metadata.get(spec.getPhysicalName()).isEmpty());
+    // validate that topic was removed
+    DescribeTopicsResult dtr = admin.adminClient.describeTopics(ImmutableSet.of(topicName));
+    try {
+      TopicDescription td = dtr.all().get().get(topicName);
+      Assert.fail("topic " + topicName + " should've been removed. td=" + td);
+    } catch (Exception e) {
+      if (e.getCause() instanceof org.apache.kafka.common.errors.UnknownTopicOrPartitionException) {
+        // expected
+      } else {
+        Assert.fail("topic " + topicName + " should've been removed. Expected UnknownTopicOrPartitionException.");
+      }
+    }
   }
 }
index 8d92f4d..392670b 100644 (file)
@@ -77,6 +77,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     kcm1.createResources
     kcm1.start
     kcm1.stop
+
     // check that start actually creates the topic with log compaction enabled
     val zkClient = ZkUtils(zkConnect, 6000, 6000, JaasUtils.isZkSecurityEnabled())
     val topicConfig = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, checkpointTopic)
index 095a1b0..a70a756 100644 (file)
@@ -62,7 +62,7 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
   var systemAdmin: KafkaSystemAdmin = null
 
   override def generateConfigs(): Seq[KafkaConfig] = {
-    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, true)
+    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, enableDeleteTopic = true)
     props.map(KafkaConfig.fromProps)
   }