SAMZA-1304: Handling duplicate stream processor registration.
[samza.git] / samza-core / src / main / java / org / apache / samza / zk / ZkJobCoordinator.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.samza.zk;
20
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import org.apache.commons.lang3.StringUtils;
28 import org.apache.samza.config.ApplicationConfig;
29 import org.apache.samza.config.Config;
30 import org.apache.samza.config.ConfigException;
31 import org.apache.samza.config.JobConfig;
32 import org.apache.samza.config.MetricsConfig;
33 import org.apache.samza.config.ZkConfig;
34 import org.apache.samza.coordinator.JobCoordinator;
35 import org.apache.samza.coordinator.JobCoordinatorListener;
36 import org.apache.samza.coordinator.JobModelManager;
37 import org.apache.samza.coordinator.LeaderElector;
38 import org.apache.samza.coordinator.LeaderElectorListener;
39 import org.apache.samza.job.model.JobModel;
40 import org.apache.samza.metrics.MetricsReporter;
41 import org.apache.samza.metrics.MetricsRegistry;
42 import org.apache.samza.metrics.ReadableMetricsRegistry;
43 import org.apache.samza.runtime.ProcessorIdGenerator;
44 import org.apache.samza.system.StreamMetadataCache;
45 import org.apache.samza.util.ClassLoaderHelper;
46 import org.apache.samza.util.MetricsReporterLoader;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 /**
51 * JobCoordinator for stand alone processor managed via Zookeeper.
52 */
53 public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
54 private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinator.class);
55 // TODO: MetadataCache timeout has to be 0 for the leader so that it can always have the latest information associated
56 // with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197
57 private static final int METADATA_CACHE_TTL_MS = 5000;
58
59 private final ZkUtils zkUtils;
60 private final String processorId;
61 private final ZkController zkController;
62
63 private final Config config;
64 private final ZkBarrierForVersionUpgrade barrier;
65 private final ZkJobCoordinatorMetrics metrics;
66 private final Map<String, MetricsReporter> reporters;
67
68 private StreamMetadataCache streamMetadataCache = null;
69 private ScheduleAfterDebounceTime debounceTimer = null;
70 private JobCoordinatorListener coordinatorListener = null;
71 private JobModel newJobModel;
72 private int debounceTimeMs;
73
74 public ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry) {
75 this.config = config;
76 ZkConfig zkConfig = new ZkConfig(config);
77 ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getGlobalAppId());
78 this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
79 this.zkUtils = new ZkUtils(
80 keyBuilder,
81 ZkCoordinationServiceFactory.createZkClient(
82 zkConfig.getZkConnect(),
83 zkConfig.getZkSessionTimeoutMs(),
84 zkConfig.getZkConnectionTimeoutMs()),
85 zkConfig.getZkConnectionTimeoutMs(), metricsRegistry);
86
87 this.processorId = createProcessorId(config);
88 LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils);
89 leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
90 this.zkController = new ZkControllerImpl(processorId, zkUtils, this, leaderElector);
91 this.barrier = new ZkBarrierForVersionUpgrade(
92 keyBuilder.getJobModelVersionBarrierPrefix(),
93 zkUtils,
94 new ZkBarrierListenerImpl());
95 this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
96 this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
97
98 }
99
100 @Override
101 public void start() {
102 startMetrics();
103 streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config);
104
105 debounceTimer = new ScheduleAfterDebounceTime(throwable -> {
106 LOG.error("Received exception from in JobCoordinator Processing!", throwable);
107 stop();
108 });
109
110 zkController.register();
111 }
112
113 @Override
114 public synchronized void stop() {
115 if (coordinatorListener != null) {
116 coordinatorListener.onJobModelExpired();
117 }
118 //Setting the isLeader metric to false when the stream processor shuts down because it does not remain the leader anymore
119 metrics.isLeader.set(false);
120 debounceTimer.stopScheduler();
121 zkController.stop();
122
123 shutdownMetrics();
124 if (coordinatorListener != null) {
125 coordinatorListener.onCoordinatorStop();
126 }
127 }
128
129 private void startMetrics() {
130 for (MetricsReporter reporter: reporters.values()) {
131 reporter.register("job-coordinator-" + processorId, (ReadableMetricsRegistry) metrics.getMetricsRegistry());
132 reporter.start();
133 }
134 }
135
136 private void shutdownMetrics() {
137 for (MetricsReporter reporter: reporters.values()) {
138 reporter.stop();
139 }
140 }
141
142 @Override
143 public void setListener(JobCoordinatorListener listener) {
144 this.coordinatorListener = listener;
145 }
146
147 @Override
148 public JobModel getJobModel() {
149 return newJobModel;
150 }
151
152 @Override
153 public String getProcessorId() {
154 return processorId;
155 }
156
157 //////////////////////////////////////////////// LEADER stuff ///////////////////////////
158 @Override
159 public void onProcessorChange(List<String> processors) {
160 LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size());
161 debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
162 debounceTimeMs, () -> doOnProcessorChange(processors));
163 }
164
165 void doOnProcessorChange(List<String> processors) {
166 // if list of processors is empty - it means we are called from 'onBecomeLeader'
167 // TODO: Handle empty currentProcessorIds.
168 List<String> currentProcessorIds = getActualProcessorIds(processors);
169 Set<String> uniqueProcessorIds = new HashSet<String>(currentProcessorIds);
170
171 if (currentProcessorIds.size() != uniqueProcessorIds.size()) {
172 LOG.info("Processors: {} has duplicates. Not generating job model.", currentProcessorIds);
173 return;
174 }
175
176 // Generate the JobModel
177 JobModel jobModel = generateNewJobModel(currentProcessorIds);
178 // Assign the next version of JobModel
179 String currentJMVersion = zkUtils.getJobModelVersion();
180 String nextJMVersion;
181 if (currentJMVersion == null) {
182 nextJMVersion = "1";
183 } else {
184 nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1);
185 }
186 LOG.info("pid=" + processorId + "Generated new Job Model. Version = " + nextJMVersion);
187
188 // Publish the new job model
189 zkUtils.publishJobModel(nextJMVersion, jobModel);
190
191 // Start the barrier for the job model update
192 barrier.create(nextJMVersion, currentProcessorIds);
193
194 // Notify all processors about the new JobModel by updating JobModel Version number
195 zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
196
197 LOG.info("pid=" + processorId + "Published new Job Model. Version = " + nextJMVersion);
198 }
199
200 @Override
201 public void onNewJobModelAvailable(final String version) {
202 debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () ->
203 {
204 LOG.info("pid=" + processorId + "new JobModel available");
205
206 // stop current work
207 if (coordinatorListener != null) {
208 coordinatorListener.onJobModelExpired();
209 }
210 // get the new job model from ZK
211 newJobModel = zkUtils.getJobModel(version);
212
213 LOG.info("pid=" + processorId + ": new JobModel available. ver=" + version + "; jm = " + newJobModel);
214
215 // update ZK and wait for all the processors to get this new version
216 barrier.join(version, processorId);
217 });
218 }
219
220 @Override
221 public void onNewJobModelConfirmed(String version) {
222 LOG.info("pid=" + processorId + "new version " + version + " of the job model got confirmed");
223 // get the new Model
224 JobModel jobModel = getJobModel();
225
226 // start the container with the new model
227 if (coordinatorListener != null) {
228 coordinatorListener.onNewJobModel(processorId, jobModel);
229 }
230 }
231
232 private String createProcessorId(Config config) {
233 // TODO: This check to be removed after 0.13+
234 ApplicationConfig appConfig = new ApplicationConfig(config);
235 if (appConfig.getProcessorId() != null) {
236 return appConfig.getProcessorId();
237 } else if (StringUtils.isNotBlank(appConfig.getAppProcessorIdGeneratorClass())) {
238 ProcessorIdGenerator idGenerator =
239 ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
240 return idGenerator.generateProcessorId(config);
241 } else {
242 throw new ConfigException(String
243 .format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
244 ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
245 }
246 }
247
248 private List<String> getActualProcessorIds(List<String> processors) {
249 if (processors.size() > 0) {
250 // we should use this list
251 // but it needs to be converted into PIDs, which is part of the data
252 return zkUtils.getActiveProcessorsIDs(processors);
253 } else {
254 // get the current list of processors
255 return zkUtils.getSortedActiveProcessorsIDs();
256 }
257 }
258
259 /**
260 * Generate new JobModel when becoming a leader or the list of processor changed.
261 */
262 private JobModel generateNewJobModel(List<String> processors) {
263 return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
264 processors);
265 }
266
267 class LeaderElectorListenerImpl implements LeaderElectorListener {
268 @Override
269 public void onBecomingLeader() {
270 LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader!");
271 metrics.isLeader.set(true);
272 zkController.subscribeToProcessorChange();
273 debounceTimer.scheduleAfterDebounceTime(
274 ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
275 debounceTimeMs, () -> {
276 // actual actions to do are the same as onProcessorChange
277 doOnProcessorChange(new ArrayList<>());
278 });
279 }
280 }
281
282 class ZkBarrierListenerImpl implements ZkBarrierListener {
283 private final String barrierAction = "BarrierAction";
284 private long startTime = 0;
285
286 @Override
287 public void onBarrierCreated(String version) {
288 // Start the timer for rebalancing
289 startTime = System.nanoTime();
290
291 metrics.barrierCreation.inc();
292 debounceTimer.scheduleAfterDebounceTime(
293 barrierAction,
294 (new ZkConfig(config)).getZkBarrierTimeoutMs(),
295 () -> barrier.expire(version)
296 );
297 }
298
299 public void onBarrierStateChanged(final String version, ZkBarrierForVersionUpgrade.State state) {
300 LOG.info("JobModel version " + version + " obtained consensus successfully!");
301 metrics.barrierStateChange.inc();
302 metrics.singleBarrierRebalancingTime.update(System.nanoTime() - startTime);
303 if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) {
304 debounceTimer.scheduleAfterDebounceTime(
305 barrierAction,
306 0,
307 () -> onNewJobModelConfirmed(version));
308 } else {
309 if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
310 // no-op
311 // In our consensus model, if the Barrier is timed-out, then it means that one or more initial
312 // participants failed to join. That means, they should have de-registered from "processors" list
313 // and that would have triggered onProcessorChange action -> a new round of consensus.
314 LOG.info("Barrier for version " + version + " timed out.");
315 }
316 }
317 }
318
319 @Override
320 public void onBarrierError(String version, Throwable t) {
321 LOG.error("Encountered error while attaining consensus on JobModel version " + version);
322 metrics.barrierError.inc();
323 stop();
324 }
325 }
326 }