SAMZA-1263: Samza Fluent: NPE is streams.id.samza.system is missing
authorJacob Maes <jmaes@linkedin.com>
Thu, 4 May 2017 20:57:50 +0000 (13:57 -0700)
committerJacob Maes <jmaes@linkedin.com>
Thu, 4 May 2017 20:57:50 +0000 (13:57 -0700)
Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Xinyu Liu <xiliu@linkedin.com>, Prateek Maheshwari <pmaheshw@linkedin.com>

Closes #165 from jmakes/samza-1263

samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java

index 237bedd..0cdeb95 100644 (file)
@@ -137,7 +137,7 @@ public class StreamSpec {
    * @param config          A map of properties for the stream. These may be System-specfic.
    */
   public StreamSpec(String id, String physicalName, String systemName, int partitionCount,  Map<String, String> config) {
-    validateLogicalIdentifier("id", id);
+    validateLogicalIdentifier("streamId", id);
     validateLogicalIdentifier("systemName", systemName);
 
     if (partitionCount < 1) {
@@ -197,7 +197,7 @@ public class StreamSpec {
   }
 
   private void validateLogicalIdentifier(String identifierName, String identifierValue) {
-    if (!identifierValue.matches("[A-Za-z0-9_-]+")) {
+    if (identifierValue == null || !identifierValue.matches("[A-Za-z0-9_-]+")) {
       throw new IllegalArgumentException(String.format("Identifier '%s' is '%s'. It must match the expression [A-Za-z0-9_-]+", identifierName, identifierValue));
     }
   }
index bd95d0b..aaacd6e 100644 (file)
@@ -20,17 +20,16 @@ package org.apache.samza.runtime;
 
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
-import org.apache.samza.application.StreamApplication;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.system.StreamSpec;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.*;
 
 public class TestAbstractApplicationRunner {
   private static final String STREAM_ID = "t3st-Stream_Id";
@@ -119,7 +118,7 @@ public class TestAbstractApplicationRunner {
   }
 
   // System is required. Throw if it cannot be determined.
-  @Test(expected = Exception.class)
+  @Test(expected = IllegalArgumentException.class)
   public void testgetStreamWithOutSystemInConfig() {
     Config config = buildStreamConfig(STREAM_ID,
                                       StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME);
@@ -291,8 +290,8 @@ public class TestAbstractApplicationRunner {
     runner.getStreamSpec(STREAM_ID, TEST_PHYSICAL_NAME, "");
   }
 
-  // Null is not allowed for system name.
-  @Test(expected = NullPointerException.class)
+  // Null is not allowed IllegalArgumentException system name.
+  @Test(expected = IllegalArgumentException.class)
   public void testGetStreamSystemNameArgNull() {
     Config config = buildStreamConfig(STREAM_ID,
                                       StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
@@ -323,7 +322,7 @@ public class TestAbstractApplicationRunner {
   }
 
   // Null is not allowed for streamId.
-  @Test(expected = NullPointerException.class)
+  @Test(expected = IllegalArgumentException.class)
   public void testGetStreamStreamIdNull() {
     Config config = buildStreamConfig(null,
         StreamConfig.SYSTEM(), TEST_SYSTEM);