94c3054196a479936092206a1c22d5640b0dcd9d
[samza.git] / samza-core / src / main / java / org / apache / samza / zk / ZkJobCoordinator.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.samza.zk;
20
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import org.apache.commons.lang3.StringUtils;
26 import org.apache.samza.config.ApplicationConfig;
27 import org.apache.samza.config.Config;
28 import org.apache.samza.config.ConfigException;
29 import org.apache.samza.config.JobConfig;
30 import org.apache.samza.config.MetricsConfig;
31 import org.apache.samza.config.ZkConfig;
32 import org.apache.samza.coordinator.JobCoordinator;
33 import org.apache.samza.coordinator.JobCoordinatorListener;
34 import org.apache.samza.coordinator.JobModelManager;
35 import org.apache.samza.coordinator.LeaderElector;
36 import org.apache.samza.coordinator.LeaderElectorListener;
37 import org.apache.samza.job.model.JobModel;
38 import org.apache.samza.metrics.MetricsReporter;
39 import org.apache.samza.metrics.MetricsRegistry;
40 import org.apache.samza.metrics.ReadableMetricsRegistry;
41 import org.apache.samza.runtime.ProcessorIdGenerator;
42 import org.apache.samza.system.StreamMetadataCache;
43 import org.apache.samza.util.ClassLoaderHelper;
44 import org.apache.samza.util.MetricsReporterLoader;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 /**
49 * JobCoordinator for stand alone processor managed via Zookeeper.
50 */
51 public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
52 private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinator.class);
53 // TODO: MetadataCache timeout has to be 0 for the leader so that it can always have the latest information associated
54 // with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197
55 private static final int METADATA_CACHE_TTL_MS = 5000;
56
57 private final ZkUtils zkUtils;
58 private final String processorId;
59 private final ZkController zkController;
60
61 private final Config config;
62 private final ZkBarrierForVersionUpgrade barrier;
63 private final ZkJobCoordinatorMetrics metrics;
64 private final Map<String, MetricsReporter> reporters;
65
66 private StreamMetadataCache streamMetadataCache = null;
67 private ScheduleAfterDebounceTime debounceTimer = null;
68 private JobCoordinatorListener coordinatorListener = null;
69 private JobModel newJobModel;
70 private int debounceTimeMs;
71
72 public ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry) {
73 this.config = config;
74 ZkConfig zkConfig = new ZkConfig(config);
75 ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getGlobalAppId());
76 this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
77 this.zkUtils = new ZkUtils(
78 keyBuilder,
79 ZkCoordinationServiceFactory.createZkClient(
80 zkConfig.getZkConnect(),
81 zkConfig.getZkSessionTimeoutMs(),
82 zkConfig.getZkConnectionTimeoutMs()),
83 zkConfig.getZkConnectionTimeoutMs(), metricsRegistry);
84
85 this.processorId = createProcessorId(config);
86 LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils);
87 leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
88 this.zkController = new ZkControllerImpl(processorId, zkUtils, this, leaderElector);
89 this.barrier = new ZkBarrierForVersionUpgrade(
90 keyBuilder.getJobModelVersionBarrierPrefix(),
91 zkUtils,
92 new ZkBarrierListenerImpl());
93 this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
94 this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
95
96 }
97
98 @Override
99 public void start() {
100 startMetrics();
101 streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config);
102
103 debounceTimer = new ScheduleAfterDebounceTime(throwable -> {
104 LOG.error("Received exception from in JobCoordinator Processing!", throwable);
105 stop();
106 });
107
108 zkController.register();
109 }
110
111 @Override
112 public synchronized void stop() {
113 if (coordinatorListener != null) {
114 coordinatorListener.onJobModelExpired();
115 }
116 //Setting the isLeader metric to false when the stream processor shuts down because it does not remain the leader anymore
117 metrics.isLeader.set(false);
118 debounceTimer.stopScheduler();
119 zkController.stop();
120
121 shutdownMetrics();
122 if (coordinatorListener != null) {
123 coordinatorListener.onCoordinatorStop();
124 }
125 }
126
127 private void startMetrics() {
128 for (MetricsReporter reporter: reporters.values()) {
129 reporter.register("job-coordinator-" + processorId, (ReadableMetricsRegistry) metrics.getMetricsRegistry());
130 reporter.start();
131 }
132 }
133
134 private void shutdownMetrics() {
135 for (MetricsReporter reporter: reporters.values()) {
136 reporter.stop();
137 }
138 }
139
140 @Override
141 public void setListener(JobCoordinatorListener listener) {
142 this.coordinatorListener = listener;
143 }
144
145 @Override
146 public JobModel getJobModel() {
147 return newJobModel;
148 }
149
150 @Override
151 public String getProcessorId() {
152 return processorId;
153 }
154
155 //////////////////////////////////////////////// LEADER stuff ///////////////////////////
156 @Override
157 public void onProcessorChange(List<String> processors) {
158 LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size());
159 debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
160 debounceTimeMs, () -> doOnProcessorChange(processors));
161 }
162
163 void doOnProcessorChange(List<String> processors) {
164 // if list of processors is empty - it means we are called from 'onBecomeLeader'
165 // TODO: Handle empty currentProcessorIds or duplicate processorIds in the list
166 List<String> currentProcessorIds = getActualProcessorIds(processors);
167
168 // Generate the JobModel
169 JobModel jobModel = generateNewJobModel(currentProcessorIds);
170 // Assign the next version of JobModel
171 String currentJMVersion = zkUtils.getJobModelVersion();
172 String nextJMVersion;
173 if (currentJMVersion == null) {
174 nextJMVersion = "1";
175 } else {
176 nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1);
177 }
178 LOG.info("pid=" + processorId + "Generated new Job Model. Version = " + nextJMVersion);
179
180 // Publish the new job model
181 zkUtils.publishJobModel(nextJMVersion, jobModel);
182
183 // Start the barrier for the job model update
184 barrier.create(nextJMVersion, currentProcessorIds);
185
186 // Notify all processors about the new JobModel by updating JobModel Version number
187 zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
188
189 LOG.info("pid=" + processorId + "Published new Job Model. Version = " + nextJMVersion);
190 }
191
192 @Override
193 public void onNewJobModelAvailable(final String version) {
194 debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () ->
195 {
196 LOG.info("pid=" + processorId + "new JobModel available");
197
198 // stop current work
199 if (coordinatorListener != null) {
200 coordinatorListener.onJobModelExpired();
201 }
202 // get the new job model from ZK
203 newJobModel = zkUtils.getJobModel(version);
204
205 LOG.info("pid=" + processorId + ": new JobModel available. ver=" + version + "; jm = " + newJobModel);
206
207 // update ZK and wait for all the processors to get this new version
208 barrier.join(version, processorId);
209 });
210 }
211
212 @Override
213 public void onNewJobModelConfirmed(String version) {
214 LOG.info("pid=" + processorId + "new version " + version + " of the job model got confirmed");
215 // get the new Model
216 JobModel jobModel = getJobModel();
217
218 // start the container with the new model
219 if (coordinatorListener != null) {
220 coordinatorListener.onNewJobModel(processorId, jobModel);
221 }
222 }
223
224 private String createProcessorId(Config config) {
225 // TODO: This check to be removed after 0.13+
226 ApplicationConfig appConfig = new ApplicationConfig(config);
227 if (appConfig.getProcessorId() != null) {
228 return appConfig.getProcessorId();
229 } else if (StringUtils.isNotBlank(appConfig.getAppProcessorIdGeneratorClass())) {
230 ProcessorIdGenerator idGenerator =
231 ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
232 return idGenerator.generateProcessorId(config);
233 } else {
234 throw new ConfigException(String
235 .format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
236 ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
237 }
238 }
239
240 private List<String> getActualProcessorIds(List<String> processors) {
241 if (processors.size() > 0) {
242 // we should use this list
243 // but it needs to be converted into PIDs, which is part of the data
244 return zkUtils.getActiveProcessorsIDs(processors);
245 } else {
246 // get the current list of processors
247 return zkUtils.getSortedActiveProcessorsIDs();
248 }
249 }
250
251 /**
252 * Generate new JobModel when becoming a leader or the list of processor changed.
253 */
254 private JobModel generateNewJobModel(List<String> processors) {
255 return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
256 processors);
257 }
258
259 class LeaderElectorListenerImpl implements LeaderElectorListener {
260 @Override
261 public void onBecomingLeader() {
262 LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader!");
263 metrics.isLeader.set(true);
264 zkController.subscribeToProcessorChange();
265 debounceTimer.scheduleAfterDebounceTime(
266 ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
267 debounceTimeMs, () -> {
268 // actual actions to do are the same as onProcessorChange
269 doOnProcessorChange(new ArrayList<>());
270 });
271 }
272 }
273
274 class ZkBarrierListenerImpl implements ZkBarrierListener {
275 private final String barrierAction = "BarrierAction";
276 private long startTime = 0;
277
278 @Override
279 public void onBarrierCreated(String version) {
280 // Start the timer for rebalancing
281 startTime = System.nanoTime();
282
283 metrics.barrierCreation.inc();
284 debounceTimer.scheduleAfterDebounceTime(
285 barrierAction,
286 (new ZkConfig(config)).getZkBarrierTimeoutMs(),
287 () -> barrier.expire(version)
288 );
289 }
290
291 public void onBarrierStateChanged(final String version, ZkBarrierForVersionUpgrade.State state) {
292 LOG.info("JobModel version " + version + " obtained consensus successfully!");
293 metrics.barrierStateChange.inc();
294 metrics.singleBarrierRebalancingTime.update(System.nanoTime() - startTime);
295 if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) {
296 debounceTimer.scheduleAfterDebounceTime(
297 barrierAction,
298 0,
299 () -> onNewJobModelConfirmed(version));
300 } else {
301 if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
302 // no-op
303 // In our consensus model, if the Barrier is timed-out, then it means that one or more initial
304 // participants failed to join. That means, they should have de-registered from "processors" list
305 // and that would have triggered onProcessorChange action -> a new round of consensus.
306 LOG.info("Barrier for version " + version + " timed out.");
307 }
308 }
309 }
310
311 @Override
312 public void onBarrierError(String version, Throwable t) {
313 LOG.error("Encountered error while attaining consensus on JobModel version " + version);
314 metrics.barrierError.inc();
315 stop();
316 }
317 }
318 }