Minor fixes for issues found during testing.
authorPrateek Maheshwari <pmaheshwari@apache.org>
Sat, 13 Oct 2018 19:38:39 +0000 (12:38 -0700)
committerPrateek Maheshwari <pmaheshwari@apache.org>
Sat, 13 Oct 2018 19:38:39 +0000 (12:38 -0700)
1. Made public methods in ApplicationDescriptorImpl non-final for mocking.
2. Updated CachedTableDescriptor public method return types to include type parameters.
3. Changed next release version to 1.0.0
4. Made KafkaSystemAdmin#toKafkaSpec public.

Author: Prateek Maheshwari <pmaheshwari@apache.org>

Reviewers: Shanthoosh Venkatraman <svenkatr@linkedin.com>

Closes #725 from prateekm/minor-fixes

gradle.properties
samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
samza-core/src/main/java/org/apache/samza/table/caching/descriptors/CachingTableDescriptor.java
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java

index 05d068b..c7e8447 100644 (file)
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 group=org.apache.samza
-version=0.15.1-SNAPSHOT
+version=1.0.0-SNAPSHOT
 scalaVersion=2.11
 
 gradleVersion=2.8
index 743f9dc..359c8f0 100644 (file)
@@ -95,12 +95,12 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
   }
 
   @Override
-  public final Config getConfig() {
+  public Config getConfig() {
     return config;
   }
 
   @Override
-  public final S withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor) {
+  public S withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor) {
     Preconditions.checkNotNull(defaultSystemDescriptor, "Provided defaultSystemDescriptor must not be null.");
     Preconditions.checkState(getInputStreamIds().isEmpty() && getOutputStreamIds().isEmpty(),
         "Default system must be set before creating any input or output streams.");
@@ -111,25 +111,25 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
   }
 
   @Override
-  public final S withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> factory) {
+  public S withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> factory) {
     this.applicationContainerContextFactoryOptional = Optional.of(factory);
     return (S) this;
   }
 
   @Override
-  public final S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> factory) {
+  public S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> factory) {
     this.applicationTaskContextFactoryOptional = Optional.of(factory);
     return (S) this;
   }
 
   @Override
-  public final S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory) {
+  public S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory) {
     this.listenerFactory = listenerFactory;
     return (S) this;
   }
 
   @Override
-  public final S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories) {
+  public S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories) {
     this.reporterFactories.clear();
     this.reporterFactories.putAll(reporterFactories);
     return (S) this;
index 10665a3..a256afc 100644 (file)
@@ -114,7 +114,7 @@ public class CachingTableDescriptor<K, V> extends BaseHybridTableDescriptor<K, V
    * @param readTtl read TTL
    * @return this descriptor
    */
-  public CachingTableDescriptor withReadTtl(Duration readTtl) {
+  public CachingTableDescriptor<K, V> withReadTtl(Duration readTtl) {
     this.readTtl = readTtl;
     return this;
   }
@@ -125,7 +125,7 @@ public class CachingTableDescriptor<K, V> extends BaseHybridTableDescriptor<K, V
    * @param writeTtl write TTL
    * @return this descriptor
    */
-  public CachingTableDescriptor withWriteTtl(Duration writeTtl) {
+  public CachingTableDescriptor<K, V> withWriteTtl(Duration writeTtl) {
     this.writeTtl = writeTtl;
     return this;
   }
@@ -135,7 +135,7 @@ public class CachingTableDescriptor<K, V> extends BaseHybridTableDescriptor<K, V
    * @param cacheSize max size of the cache
    * @return this descriptor
    */
-  public CachingTableDescriptor withCacheSize(long cacheSize) {
+  public CachingTableDescriptor<K, V> withCacheSize(long cacheSize) {
     this.cacheSize = cacheSize;
     return this;
   }
@@ -146,7 +146,7 @@ public class CachingTableDescriptor<K, V> extends BaseHybridTableDescriptor<K, V
    * dominant operation and get() has no locality with recent puts.
    * @return this descriptor
    */
-  public CachingTableDescriptor withWriteAround() {
+  public CachingTableDescriptor<K, V> withWriteAround() {
     this.isWriteAround = true;
     return this;
   }
index cb2db10..f761ab3 100644 (file)
@@ -564,7 +564,7 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin {
    * @param spec a StreamSpec object
    * @return KafkaStreamSpec object
    */
-  KafkaStreamSpec toKafkaSpec(StreamSpec spec) {
+  public KafkaStreamSpec toKafkaSpec(StreamSpec spec) {
     KafkaStreamSpec kafkaSpec;
     if (spec.isChangeLogStream()) {
       String topicName = spec.getPhysicalName();