SAMZA-1516: Another round of issues found by BEAM tests
authorxiliu <xiliu@xiliu-ld1.linkedin.biz>
Wed, 29 Nov 2017 19:34:28 +0000 (11:34 -0800)
committerxiliu <xiliu@xiliu-ld1.linkedin.biz>
Wed, 29 Nov 2017 19:34:28 +0000 (11:34 -0800)
A couple of more fixes: 1. fix a bug of identifying input streams for an operator. 2. for partitionBy, set the partitionKey to 0L when key is null.

Author: xiliu <xiliu@xiliu-ld1.linkedin.biz>

Reviewers: Jagadish V <vjagadish1989@gmail.com>

Closes #370 from xinyuiscool/SAMZA-1516

samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java

index 49b29c8..0bb12d2 100644 (file)
@@ -155,6 +155,7 @@ public class OperatorImplGraph {
    */
   OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec,
       SystemStream inputStream, Config config, TaskContext context) {
+
     if (!operatorImpls.containsKey(operatorSpec.getOpId()) || operatorSpec instanceof JoinOperatorSpec) {
       // Either this is the first time we've seen this operatorSpec, or this is a join operator spec
       // and we need to create 2 partial join operator impls for it. Initialize and register the sub-DAG.
@@ -174,9 +175,16 @@ public class OperatorImplGraph {
         });
       return operatorImpl;
     } else {
-      // the implementation corresponding to operatorSpec has already been instantiated
-      // and registered, so we do not need to traverse the DAG further.
-      return operatorImpls.get(operatorSpec.getOpId());
+      // the implementation corresponding to operatorSpec has already been instantiated and registered.
+      OperatorImpl operatorImpl = operatorImpls.get(operatorSpec.getOpId());
+      operatorImpl.registerInputStream(inputStream);
+
+      // We still need to traverse the DAG further to register the input streams.
+      Collection<OperatorSpec> registeredSpecs = operatorSpec.getRegisteredOperatorSpecs();
+      registeredSpecs.forEach(registeredSpec -> {
+          createAndRegisterOperatorImpl(operatorSpec, registeredSpec, inputStream, config, context);
+        });
+      return operatorImpl;
     }
   }
 
index 424c10f..b3fb4b2 100644 (file)
@@ -77,7 +77,8 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> {
       TaskCoordinator coordinator) {
     K key = keyFunction.apply(message);
     V value = valueFunction.apply(message);
-    collector.send(new OutgoingMessageEnvelope(systemStream, null, key, value));
+    Long partitionKey = key == null ? 0L : null;
+    collector.send(new OutgoingMessageEnvelope(systemStream, partitionKey, key, value));
     return Collections.emptyList();
   }