SAMZA-1415: Add clearStream API in SystemAdmin and remove deprecated APIs
authorXinyu Liu <xiliu@xiliu-ld1.linkedin.biz>
Thu, 7 Sep 2017 23:49:20 +0000 (16:49 -0700)
committerXinyu Liu <xiliu@xiliu-ld1.linkedin.biz>
Thu, 7 Sep 2017 23:49:20 +0000 (16:49 -0700)
The patch does the following:

1) add clearStream() APi in SystemAdmin. Currently it's only supported in Kafka with broker configuring delete.topic.enable=true.

2) remove the deprecated APIs including createChangeLogStream(), validateChangelogStream() and createCoordinatorStream().

Author: Xinyu Liu <xiliu@xiliu-ld1.linkedin.biz>

Reviewers: Jake Maes <jacob.maes@gmail.com>

Closes #292 from xinyuiscool/SAMZA-1415

20 files changed:
samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java
samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala

index 49531dd..384fecc 100644 (file)
@@ -37,6 +37,12 @@ public class StreamSpec {
 
   private static final int DEFAULT_PARTITION_COUNT = 1;
 
+  // Internal changelog stream id. It is used for creating changelog StreamSpec.
+  private static final String CHANGELOG_STREAM_ID = "samza-internal-changelog-stream-id";
+
+  // Internal coordinator stream id. It is used for creating coordinator StreamSpec.
+  private static final String COORDINATOR_STREAM_ID = "samza-internal-coordinator-stream-id";
+
   /**
    * Unique identifier for the stream in a Samza application.
    * This identifier is used as a key for stream properties in the
@@ -200,6 +206,14 @@ public class StreamSpec {
     return new SystemStream(systemName, physicalName);
   }
 
+  public boolean isChangeLogStream() {
+    return id.equals(CHANGELOG_STREAM_ID);
+  }
+
+  public boolean isCoordinatorStream() {
+    return id.equals(COORDINATOR_STREAM_ID);
+  }
+
   private void validateLogicalIdentifier(String identifierName, String identifierValue) {
     if (identifierValue == null || !identifierValue.matches("[A-Za-z0-9_-]+")) {
       throw new IllegalArgumentException(String.format("Identifier '%s' is '%s'. It must match the expression [A-Za-z0-9_-]+", identifierName, identifierValue));
@@ -220,4 +234,12 @@ public class StreamSpec {
   public int hashCode() {
     return id.hashCode();
   }
+
+  public static StreamSpec createChangeLogStreamSpec(String physicalName, String systemName, int partitionCount) {
+    return new StreamSpec(CHANGELOG_STREAM_ID, physicalName, systemName, partitionCount);
+  }
+
+  public static StreamSpec createCoordinatorStreamSpec(String physicalName, String systemName) {
+    return new StreamSpec(COORDINATOR_STREAM_ID, physicalName, systemName, 1);
+  }
 }
index b180712..e765540 100644 (file)
@@ -50,38 +50,6 @@ public interface SystemAdmin {
   Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames);
 
   /**
-   * An API to create a change log stream
-   *
-   * @param streamName
-   *          The name of the stream to be created in the underlying stream
-   * @param numOfPartitions
-   *          The number of partitions in the changelog stream
-   * @deprecated since 0.12.1, use {@link #createStream(StreamSpec)}
-   */
-  void createChangelogStream(String streamName, int numOfPartitions);
-
-  /**
-   * Validates change log stream
-   *
-   * @param streamName
-   *          The name of the stream to be created in the underlying stream
-   * @param numOfPartitions
-   *          The number of partitions in the changelog stream
-   * @deprecated since 0.12.1, use {@link #validateStream(StreamSpec)}
-   */
-  void validateChangelogStream(String streamName, int numOfPartitions);
-
-  /**
-   * Create a stream for the job coordinator. If the stream already exists, this
-   * call should simply return.
-   *
-   * @param streamName
-   *          The name of the coordinator stream to create.
-   * @deprecated since 0.12.1, use {@link #createStream(StreamSpec)}
-   */
-  void createCoordinatorStream(String streamName);
-
-  /**
    * Compare the two offsets. -1, 0, +1 means offset1 &lt; offset2,
    * offset1 == offset2 and offset1 &gt; offset2 respectively. Return
    * null if those two offsets are not comparable
@@ -114,4 +82,14 @@ public interface SystemAdmin {
   default void validateStream(StreamSpec streamSpec) throws StreamValidationException {
     throw new UnsupportedOperationException();
   }
+
+  /**
+   * Clear the stream described by the spec.
+   * @param streamSpec  The spec for the physical stream on the system.
+   * @return {@code true} if the stream was successfully cleared.
+   *         {@code false} if clearing stream failed.
+   */
+  default boolean clearStream(StreamSpec streamSpec) {
+    throw new UnsupportedOperationException();
+  }
 }
index 2157e69..49f7da0 100644 (file)
@@ -24,7 +24,6 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.samza.Partition;
-import org.apache.samza.SamzaException;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
@@ -56,16 +55,6 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
   }
 
   @Override
-  public void createChangelogStream(String streamName, int numOfPartitions) {
-    throw new SamzaException("Method not implemented");
-  }
-
-  @Override
-  public void validateChangelogStream(String streamName, int numOfPartitions) {
-    throw new SamzaException("Method not implemented");
-  }
-
-  @Override
   public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
     Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>();
 
@@ -77,11 +66,6 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
   }
 
   @Override
-  public void createCoordinatorStream(String streamName) {
-    throw new UnsupportedOperationException("Single partition admin can't create coordinator streams.");
-  }
-
-  @Override
   public Integer offsetComparator(String offset1, String offset2) {
     return null;
   }
index 6319173..42bedec 100644 (file)
@@ -304,7 +304,13 @@ object JobModelManager extends Logging {
         .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemStream.getSystem))
         ).getAdmin(systemStream.getSystem, config)
 
-      systemAdmin.createChangelogStream(systemStream.getStream, changeLogPartitions)
+      val changelogSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream, systemStream.getSystem, changeLogPartitions)
+      if (systemAdmin.createStream(changelogSpec)) {
+        info("Created changelog stream %s." format systemStream.getStream)
+      } else {
+        info("Changelog stream %s already exists." format systemStream.getStream)
+      }
+      systemAdmin.validateStream(changelogSpec)
     }
   }
 
index f34db99..0e973e9 100644 (file)
@@ -29,6 +29,7 @@ import org.apache.samza.job.ApplicationStatus.{Running, SuccessfulFinish}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.runtime.ApplicationRunnerMain.ApplicationRunnerCommandLine
 import org.apache.samza.runtime.ApplicationRunnerOperation
+import org.apache.samza.system.StreamSpec
 import org.apache.samza.util.{Logging, Util}
 
 import scala.collection.JavaConverters._
@@ -85,7 +86,13 @@ class JobRunner(config: Config) extends Logging {
     info("Creating coordinator stream")
     val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
     val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
-    systemAdmin.createCoordinatorStream(coordinatorSystemStream.getStream)
+    val streamName = coordinatorSystemStream.getStream
+    val coordinatorSpec = StreamSpec.createCoordinatorStreamSpec(streamName, coordinatorSystemStream.getSystem)
+    if (systemAdmin.createStream(coordinatorSpec)) {
+      info("Created coordinator stream %s." format streamName)
+    } else {
+      info("Coordinator stream %s already exists." format streamName)
+    }
 
     if (resetJobConfig) {
       info("Storing config in coordinator stream.")
index 977ac5b..0879e9a 100644 (file)
@@ -25,14 +25,7 @@ import java.util
 import org.apache.samza.config.StorageConfig
 import org.apache.samza.{Partition, SamzaException}
 import org.apache.samza.container.TaskName
-import org.apache.samza.system.StreamMetadataCache
-import org.apache.samza.system.SystemAdmin
-import org.apache.samza.system.SystemConsumer
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.SystemStreamPartitionIterator
-import org.apache.samza.system.ExtendedSystemAdmin
-import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.system._
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
 import org.apache.samza.util.Clock
@@ -218,7 +211,8 @@ class TaskStorageManager(
       val systemAdmin = systemAdmins
         .getOrElse(systemStream.getSystem,
                    throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
-      systemAdmin.validateChangelogStream(systemStream.getStream, changeLogStreamPartitions)
+      val changelogSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream, systemStream.getSystem, changeLogStreamPartitions)
+      systemAdmin.validateStream(changelogSpec)
     }
 
     val changeLogMetadata = streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet)
index 662c737..6413413 100644 (file)
@@ -25,14 +25,7 @@ import org.apache.samza.config.ConfigException;
 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemConsumer;
-import org.apache.samza.system.SystemFactory;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.system.SystemProducer;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.*;
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
 import org.apache.samza.util.Util;
 
@@ -208,8 +201,10 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
   }
 
   public static final class MockSystemAdmin extends SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
-    public void createCoordinatorStream(String streamName) {
+    @Override
+    public boolean createStream(StreamSpec streamSpec) {
       // Do nothing.
+      return true;
     }
   }
 
index 2c8f682..c4fd8f7 100644 (file)
@@ -93,21 +93,6 @@ public class TestExecutionPlanner {
       }
 
       @Override
-      public void createChangelogStream(String streamName, int numOfPartitions) {
-
-      }
-
-      @Override
-      public void validateChangelogStream(String streamName, int numOfPartitions) {
-
-      }
-
-      @Override
-      public void createCoordinatorStream(String streamName) {
-
-      }
-
-      @Override
       public Integer offsetComparator(String offset1, String offset2) {
         return null;
       }
index abfc63f..48504a9 100644 (file)
@@ -340,18 +340,6 @@ class TestOffsetManager {
       def getSystemStreamMetadata(streamNames: java.util.Set[String]) =
         Map[String, SystemStreamMetadata]().asJava
 
-      override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
-        new UnsupportedOperationException("Method not implemented.")
-      }
-
-      override def validateChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
-        new UnsupportedOperationException("Method not implemented.")
-      }
-
-      override def createCoordinatorStream(streamName: String) {
-        new UnsupportedOperationException("Method not implemented.")
-      }
-
       override def offsetComparator(offset1: String, offset2: String) = null
     }
   }
index 9025077..dcb06d3 100644 (file)
@@ -398,9 +398,6 @@ class TestTaskInstance {
 class MockSystemAdmin extends SystemAdmin {
   override def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = { offsets }
   override def getSystemStreamMetadata(streamNames: java.util.Set[String]) = null
-  override def createCoordinatorStream(stream: String) = {}
-  override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {}
-  override def validateChangelogStream(topicName: String, numOfPartitions: Int) = {}
 
   override def offsetComparator(offset1: String, offset2: String) = {
     offset1.toLong compare offset2.toLong
index 0b6dd8b..e6b148b 100644 (file)
@@ -318,18 +318,6 @@ class MockSystemAdmin extends ExtendedSystemAdmin {
     Map(streamNames.asScala.toList.head -> new SystemStreamMetadata("foo", partitionMetadata.asJava)).asJava
   }
 
-  override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
-    new UnsupportedOperationException("Method not implemented.")
-  }
-
-  override def validateChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
-    new UnsupportedOperationException("Method not implemented.")
-  }
-
-  override def createCoordinatorStream(streamName: String) {
-    new UnsupportedOperationException("Method not implemented.")
-  }
-
   override def offsetComparator(offset1: String, offset2: String) = null
 
   override def getSystemStreamPartitionCounts(streamNames: util.Set[String],
index 2495baf..ea4d37b 100644 (file)
@@ -95,7 +95,8 @@ class TestTaskStorageManager extends MockitoSugar {
     val mockStreamMetadataCache = mock[StreamMetadataCache]
     val mockSystemConsumer = mock[SystemConsumer]
     val mockSystemAdmin = mock[SystemAdmin]
-    doNothing().when(mockSystemAdmin).validateChangelogStream("testStream", 1)
+    val changelogSpec = StreamSpec.createChangeLogStreamSpec("testStream", "kafka", 1)
+    doNothing().when(mockSystemAdmin).validateStream(changelogSpec)
     var registerOffset = "0"
     when(mockSystemConsumer.register(any(), any())).thenAnswer(new Answer[Unit] {
       override def answer(invocation: InvocationOnMock): Unit = {
@@ -204,7 +205,8 @@ class TestTaskStorageManager extends MockitoSugar {
     // Mock for StreamMetadataCache, SystemConsumer, SystemAdmin
     val mockStreamMetadataCache = mock[StreamMetadataCache]
     val mockSystemAdmin = mock[SystemAdmin]
-    doNothing().when(mockSystemAdmin).validateChangelogStream("testStream", 1)
+    val changelogSpec = StreamSpec.createChangeLogStreamSpec("testStream", "kafka", 1)
+    doNothing().when(mockSystemAdmin).validateStream(changelogSpec)
 
     val mockSystemConsumer = mock[SystemConsumer]
     when(mockSystemConsumer.register(any(), any())).thenAnswer(new Answer[Unit] {
index 446534a..3cadce0 100644 (file)
@@ -54,21 +54,6 @@ public class ElasticsearchSystemAdmin implements SystemAdmin {
   }
 
   @Override
-  public void createChangelogStream(String stream, int foo) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void createCoordinatorStream(String streamName) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void validateChangelogStream(String streamName, int numOfPartitions) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
   public Integer offsetComparator(String offset1, String offset2) {
          throw new UnsupportedOperationException();
   }
index f5b05fb..9251db0 100644 (file)
@@ -201,21 +201,6 @@ public class HdfsSystemAdmin implements SystemAdmin {
     return systemStreamMetadataMap;
   }
 
-  @Override
-  public void createChangelogStream(String streamName, int numOfPartitions) {
-    throw new UnsupportedOperationException("HDFS doesn't support change log stream.");
-  }
-
-  @Override
-  public void validateChangelogStream(String streamName, int numOfPartitions) {
-    throw new UnsupportedOperationException("HDFS doesn't support change log stream.");
-  }
-
-  @Override
-  public void createCoordinatorStream(String streamName) {
-    throw new UnsupportedOperationException("HDFS doesn't support coordinator stream.");
-  }
-
   /**
    * Compare two multi-file style offset. A multi-file style offset consist of both
    * the file index as well as the offset within that file. And the format of it is:
index af77d5b..6e582e9 100644 (file)
@@ -37,9 +37,8 @@ import scala.collection.JavaConverters._
 
 
 object KafkaSystemAdmin extends Logging {
-  // Use a dummy string for the stream id. The physical name and partition count are all that matter for changelog creation, so the dummy string should not be used.
-  // We cannot use the topic name, as it may include special chars which are not allowed in stream IDs. See SAMZA-1317
-  val CHANGELOG_STREAMID = "unused-temp-changelog-stream-id"
+
+  val CLEAR_STREAM_RETRIES = 3
 
   /**
    * A helper method that takes oldest, newest, and upcoming offsets for each
@@ -328,23 +327,11 @@ class KafkaSystemAdmin(
      offset
   }
 
-  override def createCoordinatorStream(streamName: String) {
-    info("Attempting to create coordinator stream %s." format streamName)
-
-    val streamSpec = new KafkaStreamSpec(streamName, streamName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties)
-
-    if (createStream(streamSpec)) {
-      info("Created coordinator stream %s." format streamName)
-    } else {
-      info("Coordinator stream %s already exists." format streamName)
-    }
-  }
-
   /**
    * Helper method to use topic metadata cache when fetching metadata, so we
    * don't hammer Kafka more than we need to.
    */
-  protected def getTopicMetadata(topics: Set[String]) = {
+  def getTopicMetadata(topics: Set[String]) = {
     new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
       .getTopicInfo(topics)
   }
@@ -415,7 +402,7 @@ class KafkaSystemAdmin(
    * @inheritdoc
    */
   override def createStream(spec: StreamSpec): Boolean = {
-    val kSpec = KafkaStreamSpec.fromSpec(spec);
+    val kSpec = toKafkaSpec(spec)
     var streamCreated = false
 
     new ExponentialSleepStrategy(initialDelayMs = 500).run(
@@ -451,6 +438,23 @@ class KafkaSystemAdmin(
   }
 
   /**
+   * Converts a StreamSpec into a KafakStreamSpec. Special handling for coordinator and changelog stream.
+   * @param spec a StreamSpec object
+   * @return KafkaStreamSpec object
+   */
+  def toKafkaSpec(spec: StreamSpec): KafkaStreamSpec = {
+    if (spec.isChangeLogStream) {
+      val topicName = spec.getPhysicalName
+      val topicMeta = topicMetaInformation.getOrElse(topicName, throw new StreamValidationException("Unable to find topic information for topic " + topicName))
+      new KafkaStreamSpec(spec.getId, topicName, systemName, spec.getPartitionCount, topicMeta.replicationFactor, topicMeta.kafkaProps)
+    } else if (spec.isCoordinatorStream){
+      new KafkaStreamSpec(spec.getId, spec.getPhysicalName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties)
+    } else {
+      KafkaStreamSpec.fromSpec(spec)
+    }
+  }
+
+  /**
     * @inheritdoc
     *
     * Validates a stream in Kafka. Should not be called before createStream(),
@@ -491,32 +495,41 @@ class KafkaSystemAdmin(
   }
 
   /**
-    * Exception to be thrown when the change log stream creation or validation has failed
-    */
-  class KafkaChangelogException(s: String, t: Throwable) extends SamzaException(s, t) {
-    def this(s: String) = this(s, null)
-  }
-  
-  override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {
-    val topicMeta = topicMetaInformation.getOrElse(topicName, throw new KafkaChangelogException("Unable to find topic information for topic " + topicName))
-    val spec = new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions, topicMeta.replicationFactor, topicMeta.kafkaProps)
+   * @inheritdoc
+   *
+   * Delete a stream in Kafka. Deleting topics works only when the broker is configured with "delete.topic.enable=true".
+   * Otherwise it's a no-op.
+   */
+  override def clearStream(spec: StreamSpec): Boolean = {
+    val kSpec = KafkaStreamSpec.fromSpec(spec)
+    var retries = CLEAR_STREAM_RETRIES
+    new ExponentialSleepStrategy().run(
+      loop => {
+        val zkClient = connectZk()
+        try {
+          AdminUtils.deleteTopic(
+            zkClient,
+            kSpec.getPhysicalName)
+        } finally {
+          zkClient.close
+        }
 
-    if (createStream(spec)) {
-      info("Created changelog stream %s." format topicName)
-    } else {
-      info("Changelog stream %s already exists." format topicName)
-    }
+        loop.done
+      },
 
-    validateStream(spec)
-  }
+      (exception, loop) => {
+        if (retries > 0) {
+          warn("Exception while trying to delete topic %s: %s. Retrying." format (spec.getPhysicalName, exception))
+          retries -= 1
+        } else {
+          warn("Fail to delete topic %s: %s" format (spec.getPhysicalName, exception))
+          loop.done
+          throw exception
+        }
+      })
 
-  /**
-    * Validates a stream in Kafka. Should not be called before createStream(),
-    * since ClientUtils.fetchTopicMetadata(), used by different Kafka clients, is not read-only and
-    * will auto-create a new topic.
-    */
-  override def validateChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {
-    validateStream(new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions))
+    val topicMetadata = getTopicMetadata(Set(kSpec.getPhysicalName)).get(kSpec.getPhysicalName).get
+    topicMetadata.partitionsMetadata.isEmpty
   }
 
   /**
index ce59b40..51af518 100644 (file)
 
 package org.apache.samza.system.kafka;
 
+import java.util.*;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Properties;
+
+import kafka.api.TopicMetadata;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.StreamValidationException;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.util.Util;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import static org.junit.Assert.*;
@@ -39,53 +40,48 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
 
 
   @Test
-  public void testCreateCoordinatorStreamDelegatesToCreateStream() {
+  public void testCreateCoordinatorStream() {
     KafkaSystemAdmin systemAdmin = createSystemAdmin();//coordProps, 3, new scala.collection.immutable.HashMap<>(), 1000);
     SystemAdmin admin = Mockito.spy(systemAdmin);
-    StreamSpec spec = new StreamSpec("testId", "testCoordinatorStream", "testSystem");
+    StreamSpec spec = StreamSpec.createCoordinatorStreamSpec("testCoordinatorStream", "testSystem");
 
-    admin.createCoordinatorStream(spec.getPhysicalName());
+    admin.createStream(spec);
     admin.validateStream(spec);
 
     Mockito.verify(admin).createStream(Mockito.any());
   }
 
   @Test
-  public void testCreateChangelogStreamDelegatesToCreateStream() {
-    final String STREAM = "testChangeLogStream";
-    final int PARTITIONS = 12;
-    final int REP_FACTOR = 3;
+  public void testCreateCoordinatorStreamWithSpecialCharsInTopicName() {
+    final String STREAM = "test.coordinator_test.Stream";
 
     Properties coordProps = new Properties();
-    Properties changeLogProps = new Properties();
-    changeLogProps.setProperty("cleanup.policy", "compact");
-    changeLogProps.setProperty("segment.bytes", "139");
     Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
-    changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
 
-    SystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap)));
-    StreamSpec spec = new StreamSpec(KafkaSystemAdmin.CHANGELOG_STREAMID(), STREAM, SYSTEM(), PARTITIONS);
-    admin.createChangelogStream(STREAM, PARTITIONS);
+    KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, Util.javaMapAsScalaMap(changeLogMap)));
+    StreamSpec spec = StreamSpec.createCoordinatorStreamSpec(STREAM, SYSTEM());
+
+    Mockito.doAnswer(invocationOnMock -> {
+      StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();
+      assertTrue(internalSpec instanceof KafkaStreamSpec);  // KafkaStreamSpec is used to carry replication factor
+      assertTrue(internalSpec.isCoordinatorStream());
+      assertEquals(SYSTEM(), internalSpec.getSystemName());
+      assertEquals(STREAM, internalSpec.getPhysicalName());
+      assertEquals(1, internalSpec.getPartitionCount());
+
+      return internalSpec;
+    }).when(admin).toKafkaSpec(Mockito.any());
+
+    admin.createStream(spec);
     admin.validateStream(spec);
 
-    ArgumentCaptor<StreamSpec> specCaptor = ArgumentCaptor.forClass(StreamSpec.class);
-    Mockito.verify(admin).createStream(specCaptor.capture());
-
-    StreamSpec internalSpec = specCaptor.getValue();
-    assertTrue(internalSpec instanceof KafkaStreamSpec);  // KafkaStreamSpec is used to carry replication factor
-    assertEquals(KafkaSystemAdmin.CHANGELOG_STREAMID(), internalSpec.getId());
-    assertEquals(SYSTEM(), internalSpec.getSystemName());
-    assertEquals(STREAM, internalSpec.getPhysicalName());
-    assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
-    assertEquals(PARTITIONS, internalSpec.getPartitionCount());
-    assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties());
   }
 
   @Test
-  public void testCreateChangelogStreamDelegatesToCreateStream_specialCharsInTopicName() {
-    final String STREAM = "test.Change_Log.Stream";
+  public void testCreateChangelogStream() {
+    final String STREAM = "testChangeLogStream";
     final int PARTITIONS = 12;
-    final int REP_FACTOR = 3;
+    final int REP_FACTOR = 1;
 
     Properties coordProps = new Properties();
     Properties changeLogProps = new Properties();
@@ -94,60 +90,56 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
     Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
     changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
 
-    SystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap)));
-    StreamSpec spec = new StreamSpec(KafkaSystemAdmin.CHANGELOG_STREAMID(), STREAM, SYSTEM(), PARTITIONS);
-    admin.createChangelogStream(STREAM, PARTITIONS);
-    admin.validateStream(spec);
+    KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, Util.javaMapAsScalaMap(changeLogMap)));
+    StreamSpec spec = StreamSpec.createChangeLogStreamSpec(STREAM, SYSTEM(), PARTITIONS);
 
-    ArgumentCaptor<StreamSpec> specCaptor = ArgumentCaptor.forClass(StreamSpec.class);
-    Mockito.verify(admin).createStream(specCaptor.capture());
-
-    StreamSpec internalSpec = specCaptor.getValue();
-    assertTrue(internalSpec instanceof KafkaStreamSpec);  // KafkaStreamSpec is used to carry replication factor
-    assertEquals(KafkaSystemAdmin.CHANGELOG_STREAMID(), internalSpec.getId());
-    assertEquals(SYSTEM(), internalSpec.getSystemName());
-    assertEquals(STREAM, internalSpec.getPhysicalName());
-    assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
-    assertEquals(PARTITIONS, internalSpec.getPartitionCount());
-    assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties());
-  }
+    Mockito.doAnswer(invocationOnMock -> {
+      StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();
+      assertTrue(internalSpec instanceof KafkaStreamSpec);  // KafkaStreamSpec is used to carry replication factor
+      assertTrue(internalSpec.isChangeLogStream());
+      assertEquals(SYSTEM(), internalSpec.getSystemName());
+      assertEquals(STREAM, internalSpec.getPhysicalName());
+      assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
+      assertEquals(PARTITIONS, internalSpec.getPartitionCount());
+      assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties());
 
-  @Test
-  public void testValidateChangelogStreamDelegatesToValidateStream() {
-    final String STREAM = "testChangeLogValidate";
-    Properties coordProps = new Properties();
-    Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
-    changeLogMap.put(STREAM, new ChangelogInfo(3, new Properties()));
-
-    KafkaSystemAdmin systemAdmin = createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap));
-    SystemAdmin admin = Mockito.spy(systemAdmin);
-    StreamSpec spec = new StreamSpec("testId", STREAM, "testSystem", 12);
+      return internalSpec;
+    }).when(admin).toKafkaSpec(Mockito.any());
 
-    admin.createChangelogStream(spec.getPhysicalName(), spec.getPartitionCount());
+    admin.createStream(spec);
     admin.validateStream(spec);
-    admin.validateChangelogStream(spec.getPhysicalName(), spec.getPartitionCount());
-
-    Mockito.verify(admin).createStream(Mockito.any());
-    Mockito.verify(admin, Mockito.times(3)).validateStream(Mockito.any());
   }
 
   @Test
-  public void testValidateChangelogStreamDelegatesToCreateStream_specialCharsInTopicName() {
-    final String STREAM = "test.Change_Log.Validate";
+  public void testCreateChangelogStreamWithSpecialCharsInTopicName() {
+    final String STREAM = "test.Change_Log.Stream";
+    final int PARTITIONS = 12;
+    final int REP_FACTOR = 1;
+
     Properties coordProps = new Properties();
+    Properties changeLogProps = new Properties();
+    changeLogProps.setProperty("cleanup.policy", "compact");
+    changeLogProps.setProperty("segment.bytes", "139");
     Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
-    changeLogMap.put(STREAM, new ChangelogInfo(3, new Properties()));
-
-    KafkaSystemAdmin systemAdmin = createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap));
-    SystemAdmin admin = Mockito.spy(systemAdmin);
-    StreamSpec spec = new StreamSpec("testId", STREAM, "testSystem", 12);
+    changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
 
-    admin.createChangelogStream(spec.getPhysicalName(), spec.getPartitionCount());
+    KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, Util.javaMapAsScalaMap(changeLogMap)));
+    StreamSpec spec = StreamSpec.createChangeLogStreamSpec(STREAM, SYSTEM(), PARTITIONS);
+    Mockito.doAnswer(invocationOnMock -> {
+      StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();
+      assertTrue(internalSpec instanceof KafkaStreamSpec);  // KafkaStreamSpec is used to carry replication factor
+      assertTrue(internalSpec.isChangeLogStream());
+      assertEquals(SYSTEM(), internalSpec.getSystemName());
+      assertEquals(STREAM, internalSpec.getPhysicalName());
+      assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
+      assertEquals(PARTITIONS, internalSpec.getPartitionCount());
+      assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties());
+
+      return internalSpec;
+    }).when(admin).toKafkaSpec(Mockito.any());
+
+    admin.createStream(spec);
     admin.validateStream(spec);
-    admin.validateChangelogStream(STREAM, spec.getPartitionCount()); // Should not throw
-
-    Mockito.verify(admin).createStream(Mockito.any());
-    Mockito.verify(admin, Mockito.times(3)).validateStream(Mockito.any());
   }
 
   @Test
@@ -191,4 +183,17 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
 
     admin.validateStream(spec2);
   }
+
+  @Test
+  public void testClearStream() {
+    KafkaSystemAdmin admin = this.basicSystemAdmin;
+    StreamSpec spec = new StreamSpec("testId", "testStreamClear", "testSystem", 8);
+
+    assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec));
+    assertTrue(admin.clearStream(spec));
+
+    scala.collection.immutable.Set<String> topic = new scala.collection.immutable.Set.Set1<>(spec.getPhysicalName());
+    scala.collection.immutable.Map<String, TopicMetadata> metadata = admin.getTopicMetadata(topic);
+    assertTrue(metadata.get(spec.getPhysicalName()).get().partitionsMetadata().isEmpty());
+  }
 }
index 19f3903..6fb03a1 100644 (file)
@@ -34,7 +34,7 @@ import org.apache.kafka.common.security.JaasUtils
 import org.apache.samza.Partition
 import org.apache.samza.config.KafkaProducerConfig
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition}
+import org.apache.samza.system.{StreamSpec, SystemStreamMetadata, SystemStreamPartition}
 import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, TopicMetadataStore}
 import org.junit.Assert._
 import org.junit._
@@ -283,7 +283,8 @@ class TestKafkaSystemAdmin {
     val topic = "test-coordinator-stream"
     val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamReplicationFactor = 3)
 
-    systemAdmin.createCoordinatorStream(topic)
+    val spec = StreamSpec.createCoordinatorStreamSpec(topic, "kafka")
+    systemAdmin.createStream(spec)
     validateTopic(topic, 1)
     val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), "kafka", metadataStore.getTopicInfo)
     assertTrue(topicMetadataMap.contains(topic))
index a05f89a..322b367 100644 (file)
@@ -66,21 +66,6 @@ public class MockSystemAdmin implements SystemAdmin {
   }
 
   @Override
-  public void createChangelogStream(String streamName, int numOfPartitions) {
-    throw new UnsupportedOperationException("Method not implemented");
-  }
-
-  @Override
-  public void validateChangelogStream(String streamName, int numOfPartitions) {
-    throw new UnsupportedOperationException("Method not implemented");
-  }
-
-  @Override
-  public void createCoordinatorStream(String streamName) {
-    throw new UnsupportedOperationException("Method not implemented.");
-  }
-
-  @Override
   public Integer offsetComparator(String offset1, String offset2) {
     return null;
   }
index 41f01c5..8890a2f 100644 (file)
@@ -63,21 +63,6 @@ public class SimpleSystemAdmin implements SystemAdmin {
   }
 
   @Override
-  public void createChangelogStream(String streamName, int numOfPartitions) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void validateChangelogStream(String streamName, int numOfPartitions) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void createCoordinatorStream(String streamName) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
   public Integer offsetComparator(String offset1, String offset2) {
     if (offset1 == null) {
       return offset2 == null ? 0 : -1;
index c320a97..5650d4b 100644 (file)
@@ -38,17 +38,5 @@ class MockSystemAdmin(numTasks: Int) extends SystemAdmin {
     }).toMap.asJava
   }
 
-  override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
-    new UnsupportedOperationException("Method not implemented.")
-  }
-
-  override def validateChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
-    new UnsupportedOperationException("Method not implemented.")
-  }
-
-  override def createCoordinatorStream(streamName: String) {
-    new UnsupportedOperationException("Method not implemented.")
-  }
-
   override def offsetComparator(offset1: String, offset2: String) = null
 }