SAMZA-1361; OperatorImplGraph is using wrong keys to store/retrieve OperatorImpl...
[samza.git] / samza-core / src / main / java / org / apache / samza / operators / impl / OperatorImplGraph.java
index e5fce13..99496eb 100644 (file)
@@ -129,7 +129,7 @@ public class OperatorImplGraph {
    */
   OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec,
       Config config, TaskContext context) {
-    if (!operatorImpls.containsKey(operatorSpec) || operatorSpec instanceof JoinOperatorSpec) {
+    if (!operatorImpls.containsKey(operatorSpec.getOpName()) || operatorSpec instanceof JoinOperatorSpec) {
       // Either this is the first time we've seen this operatorSpec, or this is a join operator spec
       // and we need to create 2 partial join operator impls for it. Initialize and register the sub-DAG.
       OperatorImpl operatorImpl = createOperatorImpl(prevOperatorSpec, operatorSpec, config, context);
@@ -145,7 +145,7 @@ public class OperatorImplGraph {
     } else {
       // the implementation corresponding to operatorSpec has already been instantiated
       // and registered, so we do not need to traverse the DAG further.
-      return operatorImpls.get(operatorSpec);
+      return operatorImpls.get(operatorSpec.getOpName());
     }
   }
 
@@ -179,7 +179,6 @@ public class OperatorImplGraph {
   private PartialJoinOperatorImpl createPartialJoinOperatorImpl(OperatorSpec prevOperatorSpec,
       JoinOperatorSpec joinOpSpec, Config config, TaskContext context, Clock clock) {
     Pair<PartialJoinFunction, PartialJoinFunction> partialJoinFunctions = getOrCreatePartialJoinFunctions(joinOpSpec);
-
     if (joinOpSpec.getLeftInputOpSpec().equals(prevOperatorSpec)) { // we got here from the left side of the join
       return new PartialJoinOperatorImpl(joinOpSpec, /* isLeftSide */ true,
           partialJoinFunctions.getLeft(), partialJoinFunctions.getRight(), config, context, clock);