System and Stream Descriptor API cleanup.
authorPrateek Maheshwari <pmaheshwari@apache.org>
Thu, 11 Oct 2018 23:35:08 +0000 (16:35 -0700)
committerPrateek Maheshwari <pmaheshwari@apache.org>
Thu, 11 Oct 2018 23:35:08 +0000 (16:35 -0700)
Major changes:
1. Made withPhysicalName(String) a protected method in StreamDescriptor.
The primary reason to set the physical name is to use stream names with special characters in them, which streamId doesn't support. This change is to make it so that the physical name setter should only be exposed by systems where it means something useful - like HDFS.
2. Renamed some methods in StreamDescriptor for clarity.

Author: Prateek Maheshwari <pmaheshwari@apache.org>

Reviewers: Cameron Lee <calee@linkedin.com>

Closes #708 from prateekm/stream-descriptor-cleanup

samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericInputDescriptor.java
samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericOutputDescriptor.java
samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/InputDescriptor.java
samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/StreamDescriptor.java
samza-api/src/test/java/org/apache/samza/operators/descriptors/TestExpandingInputDescriptor.java
samza-api/src/test/java/org/apache/samza/operators/descriptors/TestGenericInputDescriptor.java
samza-api/src/test/java/org/apache/samza/operators/descriptors/TestSimpleInputDescriptor.java
samza-api/src/test/java/org/apache/samza/operators/descriptors/TestTransformingInputDescriptor.java
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaInputDescriptor.java

index a2df29a..09dd381 100644 (file)
@@ -40,4 +40,9 @@ public final class GenericInputDescriptor<StreamMessageType>
   GenericInputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde serde) {
     super(streamId, serde, systemDescriptor, null);
   }
+
+  @Override
+  public GenericInputDescriptor<StreamMessageType> withPhysicalName(String physicalName) {
+    return super.withPhysicalName(physicalName);
+  }
 }
index b13ac21..155bd4e 100644 (file)
@@ -40,4 +40,9 @@ public final class GenericOutputDescriptor<StreamMessageType>
   GenericOutputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde serde) {
     super(streamId, serde, systemDescriptor);
   }
+
+  @Override
+  public GenericOutputDescriptor<StreamMessageType> withPhysicalName(String physicalName) {
+    return super.withPhysicalName(physicalName);
+  }
 }
index bb38ee4..708dd2a 100644 (file)
@@ -73,16 +73,15 @@ public abstract class InputDescriptor<StreamMessageType, SubClass extends InputD
   }
 
   /**
-   * If set to true, when a Samza container starts up, it ignores any checkpointed offset for this particular
+   * If set, when a Samza container starts up, it ignores any checkpointed offset for this particular
    * input stream. Its behavior is thus determined by the {@link #withOffsetDefault} setting.
    * Note that the reset takes effect every time a container is started, which may be every time you restart your job,
    * or more frequently if a container fails and is restarted by the framework.
    *
-   * @param resetOffset whether the container should ignore any checkpointed offset when starting
    * @return this input descriptor
    */
-  public SubClass withResetOffset(boolean resetOffset) {
-    this.resetOffsetOptional = Optional.of(resetOffset);
+  public SubClass shouldResetOffset() {
+    this.resetOffsetOptional = Optional.of(true);
     return (SubClass) this;
   }
 
@@ -126,38 +125,35 @@ public abstract class InputDescriptor<StreamMessageType, SubClass extends InputD
   }
 
   /**
-   * If set to true, this stream will be processed as a bootstrap stream. This means that every time a Samza container
+   * If set, this stream will be processed as a bootstrap stream. This means that every time a Samza container
    * starts up, this stream will be fully consumed before messages from any other stream are processed.
    *
-   * @param bootstrap whether this stream should be processed as a bootstrap stream
    * @return this input descriptor
    */
-  public SubClass withBootstrap(boolean bootstrap) {
-    this.isBootstrapOptional = Optional.of(bootstrap);
+  public SubClass shouldBootstrap() {
+    this.isBootstrapOptional = Optional.of(true);
     return (SubClass) this;
   }
 
   /**
-   * If set to true, this stream will be considered a bounded stream. If all input streams in an application are
+   * If set, this stream will be considered a bounded stream. If all input streams in an application are
    * bounded, the job is considered to be running in batch processing mode.
    *
-   * @param isBounded whether this stream is a bounded
    * @return this input descriptor
    */
-  public SubClass withBounded(boolean isBounded) {
-    this.isBoundedOptional = Optional.of(isBounded);
+  public SubClass isBounded() {
+    this.isBoundedOptional = Optional.of(true);
     return (SubClass) this;
   }
 
   /**
-   * If set to true, and supported by the system implementation, messages older than the latest checkpointed offset
+   * If set, and supported by the system implementation, messages older than the latest checkpointed offset
    * for this stream may be deleted after the commit.
    *
-   * @param deleteCommittedMessages whether the system should attempt to delete checkpointed messages
    * @return this input descriptor
    */
-  public SubClass withDeleteCommittedMessages(boolean deleteCommittedMessages) {
-    this.deleteCommittedMessagesOptional = Optional.of(deleteCommittedMessages);
+  public SubClass shouldDeleteCommittedMessages() {
+    this.deleteCommittedMessagesOptional = Optional.of(true);
     return (SubClass) this;
   }
 
index e2f93db..f7de728 100644 (file)
@@ -82,7 +82,7 @@ public abstract class StreamDescriptor<StreamMessageType, SubClass extends Strea
    * @param physicalName physical name for this stream.
    * @return this stream descriptor.
    */
-  public SubClass withPhysicalName(String physicalName) {
+  protected SubClass withPhysicalName(String physicalName) {
     this.physicalNameOptional = Optional.ofNullable(physicalName);
     return (SubClass) this;
   }
index e635338..1ec81ce 100644 (file)
@@ -37,15 +37,13 @@ public class TestExpandingInputDescriptor {
     ExampleExpandingOutputDescriptor<Integer> output1 = expandingSystem.getOutputDescriptor("output1", new IntegerSerde());
 
     input1
-        .withBootstrap(false)
+        .shouldBootstrap()
         .withOffsetDefault(SystemStreamMetadata.OffsetType.NEWEST)
-        .withPhysicalName("input-1")
         .withPriority(1)
-        .withResetOffset(false)
+        .shouldResetOffset()
         .withStreamConfigs(Collections.emptyMap());
 
     output1
-        .withPhysicalName("output-1")
         .withStreamConfigs(Collections.emptyMap());
   }
 
index 9b39d41..012da1b 100644 (file)
@@ -45,12 +45,12 @@ public class TestGenericInputDescriptor {
 
     input1
         .withPhysicalName("input-1")
-        .withBootstrap(false)
+        .shouldBootstrap()
         .withOffsetDefault(SystemStreamMetadata.OffsetType.NEWEST)
         .withPriority(1)
-        .withResetOffset(false)
-        .withBounded(false)
-        .withDeleteCommittedMessages(true)
+        .shouldResetOffset()
+        .isBounded()
+        .shouldDeleteCommittedMessages()
         .withStreamConfigs(Collections.emptyMap());
 
     output1
@@ -68,12 +68,12 @@ public class TestGenericInputDescriptor {
 
     GenericInputDescriptor<Double> isd = mySystem.getInputDescriptor("input-stream", new DoubleSerde())
             .withPhysicalName("physical-name")
-            .withBootstrap(true)
-            .withBounded(true)
-            .withDeleteCommittedMessages(true)
+            .shouldBootstrap()
+            .isBounded()
+            .shouldDeleteCommittedMessages()
             .withOffsetDefault(SystemStreamMetadata.OffsetType.OLDEST)
             .withPriority(12)
-            .withResetOffset(true)
+            .shouldResetOffset()
             .withStreamConfigs(ImmutableMap.of("custom-config-key", "custom-config-value"));
 
     Map<String, String> generatedConfigs = isd.toConfig();
index d8b51c5..b013db4 100644 (file)
@@ -41,15 +41,13 @@ public class TestSimpleInputDescriptor {
     ExampleSimpleOutputDescriptor<Integer> output1 = kafkaSystem.getOutputDescriptor("output1", new IntegerSerde());
 
     input1
-        .withBootstrap(false)
+        .shouldBootstrap()
         .withOffsetDefault(SystemStreamMetadata.OffsetType.NEWEST)
-        .withPhysicalName("input-1")
         .withPriority(1)
-        .withResetOffset(false)
+        .shouldResetOffset()
         .withStreamConfigs(Collections.emptyMap());
 
     output1
-        .withPhysicalName("output-1")
         .withStreamConfigs(Collections.emptyMap());
   }
 
index 60e4819..f53f66d 100644 (file)
@@ -40,15 +40,13 @@ public class TestTransformingInputDescriptor {
     ExampleTransformingOutputDescriptor<Integer> output1 = imeTransformingSystem.getOutputDescriptor("output1", new IntegerSerde());
 
     input1
-        .withBootstrap(false)
+        .shouldBootstrap()
         .withOffsetDefault(SystemStreamMetadata.OffsetType.NEWEST)
-        .withPhysicalName("input-1")
         .withPriority(1)
-        .withResetOffset(false)
+        .shouldResetOffset()
         .withStreamConfigs(Collections.emptyMap());
 
     output1
-        .withPhysicalName("output-1")
         .withStreamConfigs(Collections.emptyMap());
   }
 
index ac90cf0..689f9c9 100644 (file)
@@ -41,15 +41,13 @@ public class TestKafkaInputDescriptor {
 
     KafkaInputDescriptor<KV<String, Integer>> isd =
         sd.getInputDescriptor("input-stream", KVSerde.of(new StringSerde(), new IntegerSerde()))
-            .withPhysicalName("physical-name")
             .withConsumerAutoOffsetReset("largest")
             .withConsumerFetchMessageMaxBytes(1024 * 1024);
 
     Map<String, String> generatedConfigs = isd.toConfig();;
     assertEquals("kafka", generatedConfigs.get("streams.input-stream.samza.system"));
-    assertEquals("physical-name", generatedConfigs.get("streams.input-stream.samza.physical.name"));
-    assertEquals("largest", generatedConfigs.get("systems.kafka.streams.physical-name.consumer.auto.offset.reset"));
-    assertEquals("1048576", generatedConfigs.get("systems.kafka.streams.physical-name.consumer.fetch.message.max.bytes"));
+    assertEquals("largest", generatedConfigs.get("systems.kafka.streams.input-stream.consumer.auto.offset.reset"));
+    assertEquals("1048576", generatedConfigs.get("systems.kafka.streams.input-stream.consumer.fetch.message.max.bytes"));
   }
 
   @Test