SAMZA-1329: Switch SamzaTaskProxy to use LocalityManager.
[samza.git] / samza-core / src / main / java / org / apache / samza / coordinator / stream / AbstractCoordinatorStreamManager.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package org.apache.samza.coordinator.stream;
21
22 import org.apache.samza.container.TaskName;
23 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
24
25 import java.util.Set;
26
27 /**
28 * Abstract class which handles the common functionality for coordinator stream consumer and producer
29 */
30 public abstract class AbstractCoordinatorStreamManager {
31 private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
32 private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
33 private final String source;
34
35 /**
36 * Creates a new {@link AbstractCoordinatorStreamManager} with a given coordinator stream producer, consumer and with a given source.
37 * @param coordinatorStreamProducer the {@link CoordinatorStreamSystemProducer} which should be used with the {@link AbstractCoordinatorStreamManager}
38 * @param coordinatorStreamConsumer the {@link CoordinatorStreamSystemConsumer} which should be used with the {@link AbstractCoordinatorStreamManager}
39 * @param source ths source for the coordinator stream producer
40 */
41 protected AbstractCoordinatorStreamManager(CoordinatorStreamSystemProducer coordinatorStreamProducer, CoordinatorStreamSystemConsumer coordinatorStreamConsumer, String source) {
42 this.coordinatorStreamProducer = coordinatorStreamProducer;
43 this.coordinatorStreamConsumer = coordinatorStreamConsumer;
44 this.source = source;
45 }
46
47 /**
48 * Starts the underlying coordinator stream producer and consumer.
49 */
50 public void start() {
51 if (coordinatorStreamProducer != null) {
52 coordinatorStreamProducer.start();
53 }
54 if (coordinatorStreamConsumer != null) {
55 coordinatorStreamConsumer.start();
56 }
57 }
58
59 /**
60 * Stops the underlying coordinator stream producer and consumer.
61 */
62 public void stop() {
63 if (coordinatorStreamConsumer != null) {
64 coordinatorStreamConsumer.stop();
65 }
66 if (coordinatorStreamProducer != null) {
67 coordinatorStreamProducer.stop();
68 }
69 }
70
71 /**
72 * Sends a {@link CoordinatorStreamMessage} using the underlying system producer.
73 * @param message message which should be sent to producer
74 */
75 public void send(CoordinatorStreamMessage message) {
76 if (coordinatorStreamProducer == null) {
77 throw new UnsupportedOperationException(String.format("CoordinatorStreamProducer is not initialized in the AbstractCoordinatorStreamManager. "
78 + "manager registered source: %s, input source: %s", this.source, source));
79 }
80 coordinatorStreamProducer.send(message);
81 }
82
83 /**
84 * Returns a set of messages from the bootstrapped stream for a given source.
85 * @param source the source of the given messages
86 * @return a set of {@link CoordinatorStreamMessage} if messages exists for the given source, else an empty set
87 */
88 public Set<CoordinatorStreamMessage> getBootstrappedStream(String source) {
89 if (coordinatorStreamConsumer == null) {
90 throw new UnsupportedOperationException(String.format("CoordinatorStreamConsumer is not initialized in the AbstractCoordinatorStreamManager. "
91 + "manager registered source: %s, input source: %s", this.source, source));
92 }
93 return coordinatorStreamConsumer.getBootstrappedStream(source);
94 }
95
96 /**
97 * Register the coordinator stream consumer.
98 */
99 protected void registerCoordinatorStreamConsumer() {
100 if (coordinatorStreamConsumer != null) {
101 coordinatorStreamConsumer.register();
102 }
103 }
104
105 /**
106 * Registers the coordinator stream producer for a given source.
107 * @param source the source to register
108 */
109 protected void registerCoordinatorStreamProducer(String source) {
110 if (coordinatorStreamProducer != null) {
111 coordinatorStreamProducer.register(source);
112 }
113 }
114
115 /**
116 * Returns the source name which is managed by {@link AbstractCoordinatorStreamManager}.
117 * @return the source name
118 */
119 protected String getSource() {
120 return source;
121 }
122
123 /**
124 * Registers a consumer and a producer. Every subclass should implement it's logic for registration.<br><br>
125 * Registering a single consumer and a single producer can be done with {@link AbstractCoordinatorStreamManager#registerCoordinatorStreamConsumer()}
126 * and {@link AbstractCoordinatorStreamManager#registerCoordinatorStreamProducer(String)} methods respectively.<br>
127 * These methods can be used in the concrete implementation of this register method.
128 *
129 * @param taskName name which should be used with the producer
130 */
131 public abstract void register(TaskName taskName);
132 }