f4c1e941d35afb9cc4b4287a359e381ddde9d1d2
[samza.git] / samza-core / src / main / java / org / apache / samza / zk / ZkLeaderElector.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package org.apache.samza.zk;
21
22 import com.google.common.annotations.VisibleForTesting;
23 import java.net.InetAddress;
24 import java.net.UnknownHostException;
25 import java.util.Arrays;
26 import java.util.List;
27 import java.util.Random;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import org.I0Itec.zkclient.IZkDataListener;
30 import org.apache.samza.SamzaException;
31 import org.apache.samza.coordinator.LeaderElectorListener;
32 import org.apache.samza.coordinator.LeaderElector;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37 * <p>
38 * An implementation of Leader Elector using Zookeeper.
39 *
40 * Each participant in the leader election process creates an instance of this class and tries to become the leader.
41 * The participant with the lowest sequence number in the ZK subtree for election becomes the leader. Every non-leader
42 * sets a watcher on its predecessor, where the predecessor is the participant with the largest sequence number
43 * that is less than the current participant's sequence number.
44 * </p>
45 * */
46 public class ZkLeaderElector implements LeaderElector {
47 public static final Logger LOG = LoggerFactory.getLogger(ZkLeaderElector.class);
48 private final ZkUtils zkUtils;
49 private final String processorIdStr;
50 private final ZkKeyBuilder keyBuilder;
51 private final String hostName;
52
53 private AtomicBoolean isLeader = new AtomicBoolean(false);
54 private final IZkDataListener previousProcessorChangeListener;
55 private LeaderElectorListener leaderElectorListener = null;
56 private String currentSubscription = null;
57 private final Random random = new Random();
58
59 public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils) {
60 this.processorIdStr = processorIdStr;
61 this.zkUtils = zkUtils;
62 this.keyBuilder = zkUtils.getKeyBuilder();
63 this.hostName = getHostName();
64 this.previousProcessorChangeListener = new PreviousProcessorChangeListener(zkUtils);
65
66 zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath()});
67 }
68
69 @VisibleForTesting
70 public ZkLeaderElector(String processorIdStr,
71 ZkUtils zkUtils,
72 IZkDataListener previousProcessorChangeListener) {
73 this.processorIdStr = processorIdStr;
74 this.zkUtils = zkUtils;
75 this.keyBuilder = zkUtils.getKeyBuilder();
76 this.hostName = getHostName();
77 this.previousProcessorChangeListener = previousProcessorChangeListener;
78 }
79
80 // TODO: This should go away once we integrate with Zk based Job Coordinator
81 private String getHostName() {
82 try {
83 return InetAddress.getLocalHost().getHostName();
84 } catch (UnknownHostException e) {
85 LOG.error("Failed to fetch hostname of the processor", e);
86 throw new SamzaException(e);
87 }
88 }
89
90 /**
91 * Register a LeaderElectorListener
92 *
93 * @param listener {@link LeaderElectorListener} interfaces to be invoked upon completion of leader election participation
94 */
95 @Override
96 public void setLeaderElectorListener(LeaderElectorListener listener) {
97 this.leaderElectorListener = listener;
98 }
99
100 /**
101 * Async method that helps the caller participate in leader election.
102 **/
103 @Override
104 public void tryBecomeLeader() {
105 String currentPath = zkUtils.registerProcessorAndGetId(new ProcessorData(hostName, processorIdStr));
106
107 List<String> children = zkUtils.getSortedActiveProcessorsZnodes();
108 LOG.debug(zLog("Current active processors - " + children));
109 int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));
110
111 LOG.info("tryBecomeLeader: index = " + index + " for path=" + currentPath + " out of " + Arrays.toString(children.toArray()));
112 if (children.size() == 0 || index == -1) {
113 throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!");
114 }
115
116 if (index == 0) {
117 isLeader.getAndSet(true);
118 LOG.info(zLog("Eligible to become the leader!"));
119 if (leaderElectorListener != null) {
120 leaderElectorListener.onBecomingLeader();
121 }
122 return;
123 }
124
125 isLeader.getAndSet(false);
126 LOG.info("Index = " + index + " Not eligible to be a leader yet!");
127 String predecessor = children.get(index - 1);
128 if (!predecessor.equals(currentSubscription)) {
129 if (currentSubscription != null) {
130 LOG.debug(zLog("Unsubscribing data change for " + currentSubscription));
131 zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
132 previousProcessorChangeListener);
133 }
134 currentSubscription = predecessor;
135 LOG.info(zLog("Subscribing data change for " + predecessor));
136 zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
137 previousProcessorChangeListener);
138 }
139 /**
140 * Verify that the predecessor still exists. This step is needed because the ZkClient subscribes for data changes
141 * on the path, even if the path doesn't exist. Since we are using Ephemeral Sequential nodes, if the path doesn't
142 * exist during subscription, it is not going to get created in the future.
143 */
144 boolean predecessorExists = zkUtils.exists(keyBuilder.getProcessorsPath() + "/" + currentSubscription);
145 if (predecessorExists) {
146 LOG.info(zLog("Predecessor still exists. Current subscription is valid. Continuing as non-leader."));
147 } else {
148 try {
149 Thread.sleep(random.nextInt(1000));
150 } catch (InterruptedException e) {
151 Thread.interrupted();
152 }
153 LOG.info(zLog("Predecessor doesn't exist anymore. Trying to become leader again..."));
154 tryBecomeLeader();
155 }
156 }
157
158 @Override
159 public void resignLeadership() {
160 isLeader.compareAndSet(true, false);
161 }
162
163 @Override
164 public boolean amILeader() {
165 return isLeader.get();
166 }
167
168 private String zLog(String logMessage) {
169 return String.format("[Processor-%s] %s", processorIdStr, logMessage);
170 }
171
172 // Only by non-leaders
173 class PreviousProcessorChangeListener extends ZkUtils.GenIZkDataListener {
174
175 public PreviousProcessorChangeListener(ZkUtils zkUtils) {
176 super(zkUtils, "PreviousProcessorChangeListener");
177 }
178 @Override
179 public void handleDataChange(String dataPath, Object data) throws Exception {
180 LOG.debug("Data change on path: " + dataPath + " Data: " + data);
181 if (notAValidEvent())
182 return;
183 }
184
185 @Override
186 public void handleDataDeleted(String dataPath)
187 throws Exception {
188 LOG.info(zLog("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader again..."));
189 if (notAValidEvent()) {
190 return;
191 }
192 tryBecomeLeader();
193 }
194 }
195 }