SAMZA-1329: Switch SamzaTaskProxy to use LocalityManager.
[samza.git] / samza-core / src / main / java / org / apache / samza / coordinator / stream / AbstractCoordinatorStreamManager.java
index 813234b..9b0d849 100644 (file)
@@ -48,7 +48,9 @@ public abstract class AbstractCoordinatorStreamManager {
    * Starts the underlying coordinator stream producer and consumer.
    */
   public void start() {
-    coordinatorStreamProducer.start();
+    if (coordinatorStreamProducer != null) {
+      coordinatorStreamProducer.start();
+    }
     if (coordinatorStreamConsumer != null) {
       coordinatorStreamConsumer.start();
     }
@@ -61,7 +63,9 @@ public abstract class AbstractCoordinatorStreamManager {
     if (coordinatorStreamConsumer != null) {
       coordinatorStreamConsumer.stop();
     }
-    coordinatorStreamProducer.stop();
+    if (coordinatorStreamProducer != null) {
+      coordinatorStreamProducer.stop();
+    }
   }
 
   /**
@@ -69,6 +73,10 @@ public abstract class AbstractCoordinatorStreamManager {
    * @param message message which should be sent to producer
    */
   public void send(CoordinatorStreamMessage message) {
+    if (coordinatorStreamProducer == null) {
+      throw new UnsupportedOperationException(String.format("CoordinatorStreamProducer is not initialized in the AbstractCoordinatorStreamManager. "
+          + "manager registered source: %s, input source: %s", this.source, source));
+    }
     coordinatorStreamProducer.send(message);
   }
 
@@ -89,7 +97,9 @@ public abstract class AbstractCoordinatorStreamManager {
    * Register the coordinator stream consumer.
    */
   protected void registerCoordinatorStreamConsumer() {
-    coordinatorStreamConsumer.register();
+    if (coordinatorStreamConsumer != null) {
+      coordinatorStreamConsumer.register();
+    }
   }
 
   /**
@@ -97,7 +107,9 @@ public abstract class AbstractCoordinatorStreamManager {
    * @param source the source to register
    */
   protected void registerCoordinatorStreamProducer(String source) {
-    coordinatorStreamProducer.register(source);
+    if (coordinatorStreamProducer != null) {
+      coordinatorStreamProducer.register(source);
+    }
   }
 
   /**