SAMZA-1943 Remove ExtendedSystemAdmin and deprecated getNewestOffsets method.
authorBoris S <boryas@apache.org>
Wed, 31 Oct 2018 20:45:28 +0000 (13:45 -0700)
committerBoris S <bshkolnik@linkedin.com>
Wed, 31 Oct 2018 20:45:28 +0000 (13:45 -0700)
Author: Boris S <boryas@apache.org>
Author: Boris S <bshkolnik@linkedin.com>
Author: Boris Shkolnik <bshkolni@linkedin.com>

Reviewers: Bharath Kumarasubramanian <bkumarasubramanian@linkedin.com>

Closes #782 from sborya/removeExtendedSystemAdmin

samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java [deleted file]
samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
samza-api/src/test/java/org/apache/samza/system/TestSystemAdmin.java
samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala

diff --git a/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java
deleted file mode 100644 (file)
index ba239dc..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system;
-
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Interface extends the more generic SystemAdmin interface
- * TODO: Merge this interface method with SystemAdmin when we upgrade to JDK 1.8
- */
-public interface ExtendedSystemAdmin extends SystemAdmin {
-  Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL);
-
-  /**
-   * Deprecated: Use {@link SystemAdmin#getSSPMetadata}, ideally combined with caching (i.e. SSPMetadataCache).
-   * Makes fewer offset requests than getSystemStreamMetadata
-   */
-  @Deprecated
-  String getNewestOffset(SystemStreamPartition ssp, Integer maxRetries);
-}
index 16f90e9..6ee7df2 100644 (file)
@@ -144,4 +144,16 @@ public interface SystemAdmin {
 
   }
 
+  /**
+   * Get partitions counts only. Should be more efficient then getSystemStreamMetadata, but if not implemented
+   * revert to getSystemStreamMetadata.
+   * @param streamNames set of streams to query.
+   * @param cacheTTL cacheTTL to use if caching the values.
+   * @return A map from stream name to SystemStreamMetadata for each stream
+   *        requested in the parameter set.
+   */
+  default Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) {
+    return getSystemStreamMetadata(streamNames);
+  }
+
 }
index 85245e3..a725ce1 100644 (file)
@@ -112,5 +112,5 @@ public class TestSystemAdmin {
    * Looks like Mockito 1.x does not support using thenCallRealMethod with default methods for interfaces, but it works
    * to use this placeholder abstract class.
    */
-  private abstract class MySystemAdmin implements ExtendedSystemAdmin { }
+  private abstract class MySystemAdmin implements SystemAdmin { }
 }
\ No newline at end of file
index 637858b..edffac7 100644 (file)
@@ -62,8 +62,8 @@ class StreamMetadataCache (
       .flatMap {
         case (systemName, systemStreams) =>
           val systemAdmin = systemAdmins.getSystemAdmin(systemName)
-          val streamToMetadata = if (partitionsMetadataOnly && systemAdmin.isInstanceOf[ExtendedSystemAdmin]) {
-            systemAdmin.asInstanceOf[ExtendedSystemAdmin].getSystemStreamPartitionCounts(systemStreams.map(_.getStream).asJava, cacheTTLms)
+          val streamToMetadata = if (partitionsMetadataOnly) {
+            systemAdmin.getSystemStreamPartitionCounts(systemStreams.map(_.getStream).asJava, cacheTTLms)
           } else {
             systemAdmin.getSystemStreamMetadata(systemStreams.map(_.getStream).asJava)
           }
index a9c57da..e3030ab 100644 (file)
@@ -111,7 +111,7 @@ public class MockSystemFactory implements SystemFactory {
   }
 
   public SystemAdmin getAdmin(String systemName, Config config) {
-    return new ExtendedSystemAdmin() {
+    return new SystemAdmin() {
 
       @Override
       public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
@@ -161,12 +161,7 @@ public class MockSystemFactory implements SystemFactory {
       public Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) {
         return getSystemStreamMetadata(streamNames);
       }
-
-      @Override
-      public String getNewestOffset(SystemStreamPartition ssp, Integer maxRetries) {
-        return null;
-      }
-
+      
       @Override
       public boolean createStream(StreamSpec streamSpec) {
         return true;
index bed013c..4bb7adf 100644 (file)
@@ -82,7 +82,7 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
     ).asJava)
     TestCheckpointTool.checkpointManager = mock[CheckpointManager]
     TestCheckpointTool.systemAdmin = mock[SystemAdmin]
-    when(TestCheckpointTool.systemAdmin.getSystemStreamMetadata(Set("foo").asJava))
+    when(TestCheckpointTool.systemAdmin.getSystemStreamPartitionCounts(Set("foo").asJava, 0))
       .thenReturn(Map("foo" -> metadata).asJava)
     when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn0))
       .thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "1234").asJava))
index d2ceafb..16a2e67 100644 (file)
@@ -43,9 +43,9 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.KafkaConfig;
 import org.apache.samza.config.SystemConfig;
-import org.apache.samza.system.ExtendedSystemAdmin;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.StreamValidationException;
+import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.ExponentialSleepStrategy;
@@ -64,7 +64,7 @@ import scala.runtime.BoxedUnit;
 import static org.apache.samza.config.KafkaConsumerConfig.*;
 
 
-public class KafkaSystemAdmin implements ExtendedSystemAdmin {
+public class KafkaSystemAdmin implements SystemAdmin {
   private static final Logger LOG = LoggerFactory.getLogger(KafkaSystemAdmin.class);
 
   // Default exponential sleep strategy values
@@ -354,41 +354,6 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin {
     return result;
   }
 
-  @Override
-  public String getNewestOffset(SystemStreamPartition ssp, Integer maxRetries) {
-    LOG.info("Fetching newest offset for: {}", ssp);
-
-    ExponentialSleepStrategy strategy = new ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER,
-        DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS, DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS);
-
-    Function1<ExponentialSleepStrategy.RetryLoop, String> fetchNewestOffset =
-        new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, String>() {
-          @Override
-          public String apply(ExponentialSleepStrategy.RetryLoop loop) {
-            String result = fetchNewestOffset(ssp);
-            loop.done();
-            return result;
-          }
-        };
-
-    String offset = strategy.run(fetchNewestOffset,
-        new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() {
-          @Override
-          public BoxedUnit apply(Exception exception, ExponentialSleepStrategy.RetryLoop loop) {
-            if (loop.sleepCount() < maxRetries) {
-              LOG.warn(String.format("Fetching newest offset for: %s threw an exception. Retrying.", ssp), exception);
-            } else {
-              LOG.error(String.format("Fetching newest offset for: %s threw an exception.", ssp), exception);
-              loop.done();
-              throw new SamzaException("Exception while trying to get newest offset", exception);
-            }
-            return null;
-          }
-        }).get();
-
-    return offset;
-  }
-
   /**
    * Convert TopicPartition to SystemStreamPartition
    * @param topicPartition the topic partition to be created
index e7ff749..f16e507 100644 (file)
@@ -154,7 +154,7 @@ class KafkaSystemAdmin(
   /**
    * Whether deleteMessages() API can be used
    */
-  deleteCommittedMessages: Boolean = false) extends ExtendedSystemAdmin with Logging {
+  deleteCommittedMessages: Boolean = false) extends SystemAdmin with Logging {
 
   import KafkaSystemAdmin._
 
@@ -303,70 +303,6 @@ class KafkaSystemAdmin(
   }
 
   /**
-   * Returns the newest offset for the specified SSP.
-   * This method is fast and targeted. It minimizes the number of kafka requests.
-   * It does not retry indefinitely if there is any failure.
-   * It returns null if the topic is empty. To get the offsets for *all*
-   * partitions, it would be more efficient to call getSystemStreamMetadata
-   */
-  override def getNewestOffset(ssp: SystemStreamPartition, maxRetries: Integer) = {
-    debug("Fetching newest offset for: %s" format ssp)
-    var offset: String = null
-    var metadataTTL = Long.MaxValue // Trust the cache until we get an exception
-    var retries = maxRetries
-    new ExponentialSleepStrategy().run(
-      loop => {
-        val metadata = TopicMetadataCache.getTopicMetadata(
-          Set(ssp.getStream),
-          systemName,
-          getTopicMetadata,
-          metadataTTL)
-        debug("Got metadata for streams: %s" format metadata)
-
-        val brokersToTopicPartitions = getTopicsAndPartitionsByBroker(metadata)
-        val topicAndPartition = new TopicAndPartition(ssp.getStream, ssp.getPartition.getPartitionId)
-        val broker = brokersToTopicPartitions.filter((e) => e._2.contains(topicAndPartition)).head._1
-
-        // Get oldest, newest, and upcoming offsets for each topic and partition.
-        debug("Fetching offset for %s:%s: %s" format (broker.host, broker.port, topicAndPartition))
-        val consumer = new SimpleConsumer(broker.host, broker.port, timeout, bufferSize, clientId)
-        try {
-          offset = getOffsets(consumer, Set(topicAndPartition), OffsetRequest.LatestTime).head._2
-
-          // Kafka's "latest" offset is always last message in stream's offset +
-          // 1, so get newest message in stream by subtracting one. this is safe
-          // even for key-deduplicated streams, since the last message will
-          // never be deduplicated.
-          if (offset.toLong <= 0) {
-            debug("Stripping newest offsets for %s because the topic appears empty." format topicAndPartition)
-            offset = null
-          } else {
-            offset = (offset.toLong - 1).toString
-          }
-        } finally {
-          consumer.close
-        }
-
-        debug("Got offset %s for %s." format(offset, ssp))
-        loop.done
-      },
-
-      (exception, loop) => {
-        if (retries > 0) {
-          warn("Exception while trying to get offset for %s: %s. Retrying." format(ssp, exception))
-          metadataTTL = 0L // Force metadata refresh
-          retries -= 1
-        } else {
-          warn("Exception while trying to get offset for %s" format(ssp), exception)
-          loop.done
-          throw exception
-        }
-      })
-
-     offset
-  }
-
-  /**
    * Helper method to use topic metadata cache when fetching metadata, so we
    * don't hammer Kafka more than we need to.
    */
index 1570363..095a1b0 100644 (file)
@@ -21,6 +21,7 @@
 
 package org.apache.samza.system.kafka
 
+import com.google.common.collect.ImmutableSet
 import kafka.admin.AdminUtils
 import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector}
 import kafka.integration.KafkaServerTestHarness
@@ -316,23 +317,25 @@ class TestKafkaSystemAdmin {
     val sspUnderTest = new SystemStreamPartition("kafka", TOPIC2, new Partition(4))
     val otherSsp = new SystemStreamPartition("kafka", TOPIC2, new Partition(13))
 
-    assertNull(systemAdmin.getNewestOffset(sspUnderTest, 3))
-    assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
+    assertNull(systemAdmin.getSSPMetadata(ImmutableSet.of(sspUnderTest)).get(sspUnderTest).getNewestOffset)
+    assertNull(systemAdmin.getSSPMetadata(ImmutableSet.of(otherSsp)).get(otherSsp).getNewestOffset)
 
     // Add a new message to one of the partitions, and verify that it works as expected.
     assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 4, "key1".getBytes, "val1".getBytes)).get().offset().toString)
-    assertEquals("0", systemAdmin.getNewestOffset(sspUnderTest, 3))
-    assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
+
+    assertEquals("0", systemAdmin.getSSPMetadata(ImmutableSet.of(sspUnderTest)).get(sspUnderTest).getNewestOffset)
+    assertNull(systemAdmin.getSSPMetadata(ImmutableSet.of(otherSsp)).get(otherSsp).getNewestOffset)
 
     // Again
     assertEquals("1", producer.send(new ProducerRecord(TOPIC2, 4, "key2".getBytes, "val2".getBytes)).get().offset().toString)
-    assertEquals("1", systemAdmin.getNewestOffset(sspUnderTest, 3))
-    assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
+    assertEquals("1", systemAdmin.getSSPMetadata(ImmutableSet.of(sspUnderTest)).get(sspUnderTest).getNewestOffset)
+    assertNull(systemAdmin.getSSPMetadata(ImmutableSet.of(otherSsp)).get(otherSsp).getNewestOffset)
 
     // Add a message to both partitions
     assertEquals("2", producer.send(new ProducerRecord(TOPIC2, 4, "key3".getBytes, "val3".getBytes)).get().offset().toString)
     assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 13, "key4".getBytes, "val4".getBytes)).get().offset().toString)
-    assertEquals("2", systemAdmin.getNewestOffset(sspUnderTest, 0))
-    assertEquals("0", systemAdmin.getNewestOffset(otherSsp, 0))
+    assertEquals("2", systemAdmin.getSSPMetadata(ImmutableSet.of(sspUnderTest)).get(sspUnderTest).getNewestOffset)
+    assertEquals("0", systemAdmin.getSSPMetadata(ImmutableSet.of(otherSsp)).get(otherSsp).getNewestOffset)
+
   }
 }
\ No newline at end of file
index bf64c03..93dad09 100644 (file)
@@ -307,45 +307,4 @@ class TestKafkaSystemAdmin {
       case e: ExponentialSleepStrategy.CallLimitReached => ()
     }
   }
-
-  @Test
-  def testGetNewestOffset {
-    createTopic(TOPIC2, 16)
-    validateTopic(TOPIC2, 16)
-
-    val sspUnderTest = new SystemStreamPartition("kafka", TOPIC2, new Partition(4))
-    val otherSsp = new SystemStreamPartition("kafka", TOPIC2, new Partition(13))
-
-    assertNull(systemAdmin.getNewestOffset(sspUnderTest, 3))
-    assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
-
-    // Add a new message to one of the partitions, and verify that it works as expected.
-    assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 4, "key1".getBytes, "val1".getBytes)).get().offset().toString)
-    assertEquals("0", systemAdmin.getNewestOffset(sspUnderTest, 3))
-    assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
-
-    // Again
-    assertEquals("1", producer.send(new ProducerRecord(TOPIC2, 4, "key2".getBytes, "val2".getBytes)).get().offset().toString)
-    assertEquals("1", systemAdmin.getNewestOffset(sspUnderTest, 3))
-    assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
-
-    // Add a message to both partitions
-    assertEquals("2", producer.send(new ProducerRecord(TOPIC2, 4, "key3".getBytes, "val3".getBytes)).get().offset().toString)
-    assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 13, "key4".getBytes, "val4".getBytes)).get().offset().toString)
-    assertEquals("2", systemAdmin.getNewestOffset(sspUnderTest, 0))
-    assertEquals("0", systemAdmin.getNewestOffset(otherSsp, 0))
-  }
-
-  @Test (expected = classOf[LeaderNotAvailableException])
-  def testGetNewestOffsetMaxRetry {
-    val expectedRetryCount = 3
-    val systemAdmin = new KafkaSystemAdminWithTopicMetadataError
-    try {
-      systemAdmin.getNewestOffset(new SystemStreamPartition(SYSTEM, "quux", new Partition(0)), 3)
-    } catch {
-      case e: Exception =>
-        assertEquals(expectedRetryCount + 1, systemAdmin.metadataCallCount)
-        throw e
-    }
-  }
 }