SAMZA-1288: Add null check for sink OutputStream
authorXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Fri, 12 May 2017 16:49:04 +0000 (09:49 -0700)
committerXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Fri, 12 May 2017 16:49:04 +0000 (09:49 -0700)
The logic to generate json for Sink operator does not check whether the output stream is null. This causes null pointer exception.

Author: Xinyu Liu <xiliu@xiliu-ld.linkedin.biz>

Reviewers: Jake Maes <jmakes@apached.org>

Closes #188 from xinyuiscool/SAMZA-1288

samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java

index b52fbc3..b971607 100644 (file)
@@ -27,12 +27,9 @@ import java.util.stream.Collectors;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.samza.operators.stream.OutputStreamInternal;
 
 public class OperatorJsonUtils {
-  private static final Logger log = LoggerFactory.getLogger(OperatorJsonUtils.class);
-
   private static final String OP_CODE = "opCode";
   private static final String OP_ID = "opId";
   private static final String SOURCE_LOCATION = "sourceLocation";
@@ -59,7 +56,10 @@ public class OperatorJsonUtils {
     }
 
     if (spec instanceof SinkOperatorSpec) {
-      map.put(OUTPUT_STREAM_ID, ((SinkOperatorSpec) spec).getOutputStream().getStreamSpec().getId());
+      OutputStreamInternal outputStream = ((SinkOperatorSpec) spec).getOutputStream();
+      if (outputStream != null) {
+        map.put(OUTPUT_STREAM_ID, outputStream.getStreamSpec().getId());
+      }
     }
 
     if (spec instanceof PartialJoinOperatorSpec) {
index 2681f9c..e53cd42 100644 (file)
@@ -113,6 +113,7 @@ public class TestJobGraphJsonGenerator {
     OutputStream<Object, Object, Object> outputStream2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
 
     m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(outputStream1);
+    m2.sink((message, collector, coordinator) -> { });
     m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(outputStream2);
 
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
@@ -124,7 +125,7 @@ public class TestJobGraphJsonGenerator {
     ObjectMapper mapper = new ObjectMapper();
     JobGraphJsonGenerator.JobGraphJson nodes = mapper.readValue(json, JobGraphJsonGenerator.JobGraphJson.class);
     assertTrue(nodes.jobs.get(0).operatorGraph.inputStreams.size() == 5);
-    assertTrue(nodes.jobs.get(0).operatorGraph.operators.size() == 12);
+    assertTrue(nodes.jobs.get(0).operatorGraph.operators.size() == 13);
     assertTrue(nodes.sourceStreams.size() == 3);
     assertTrue(nodes.sinkStreams.size() == 2);
     assertTrue(nodes.intermediateStreams.size() == 2);