SAMZA-1692: Standalone stability fixes.
[samza.git] / samza-core / src / main / java / org / apache / samza / zk / ZkLeaderElector.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package org.apache.samza.zk;
21
22 import com.google.common.annotations.VisibleForTesting;
23 import java.net.InetAddress;
24 import java.net.UnknownHostException;
25 import java.util.Arrays;
26 import java.util.List;
27 import java.util.Random;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import org.I0Itec.zkclient.IZkDataListener;
30 import org.apache.samza.SamzaException;
31 import org.apache.samza.coordinator.LeaderElectorListener;
32 import org.apache.samza.coordinator.LeaderElector;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37 * <p>
38 * An implementation of Leader Elector using Zookeeper.
39 *
40 * Each participant in the leader election process creates an instance of this class and tries to become the leader.
41 * The participant with the lowest sequence number in the ZK subtree for election becomes the leader. Every non-leader
42 * sets a watcher on its predecessor, where the predecessor is the participant with the largest sequence number
43 * that is less than the current participant's sequence number.
44 * </p>
45 * */
46 public class ZkLeaderElector implements LeaderElector {
47 public static final Logger LOG = LoggerFactory.getLogger(ZkLeaderElector.class);
48 private final ZkUtils zkUtils;
49 private final String processorIdStr;
50 private final ZkKeyBuilder keyBuilder;
51 private final String hostName;
52
53 private AtomicBoolean isLeader = new AtomicBoolean(false);
54 private IZkDataListener previousProcessorChangeListener;
55 private LeaderElectorListener leaderElectorListener = null;
56 private String currentSubscription = null;
57 private final Random random = new Random();
58
59 public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils) {
60 this.processorIdStr = processorIdStr;
61 this.zkUtils = zkUtils;
62 this.keyBuilder = zkUtils.getKeyBuilder();
63 this.hostName = getHostName();
64 this.previousProcessorChangeListener = new PreviousProcessorChangeListener(zkUtils);
65
66 zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath()});
67 }
68
69 @VisibleForTesting
70 public ZkLeaderElector(String processorIdStr,
71 ZkUtils zkUtils,
72 IZkDataListener previousProcessorChangeListener) {
73 this.processorIdStr = processorIdStr;
74 this.zkUtils = zkUtils;
75 this.keyBuilder = zkUtils.getKeyBuilder();
76 this.hostName = getHostName();
77 this.previousProcessorChangeListener = previousProcessorChangeListener;
78 }
79
80 // TODO: This should go away once we integrate with Zk based Job Coordinator
81 private String getHostName() {
82 try {
83 return InetAddress.getLocalHost().getHostName();
84 } catch (UnknownHostException e) {
85 LOG.error("Failed to fetch hostname of the processor", e);
86 throw new SamzaException(e);
87 }
88 }
89
90 /**
91 * Register a LeaderElectorListener
92 *
93 * @param listener {@link LeaderElectorListener} interfaces to be invoked upon completion of leader election participation
94 */
95 @Override
96 public void setLeaderElectorListener(LeaderElectorListener listener) {
97 this.leaderElectorListener = listener;
98 }
99
100 /**
101 * Async method that helps the caller participate in leader election.
102 **/
103 @Override
104 public void tryBecomeLeader() {
105 String currentPath = zkUtils.registerProcessorAndGetId(new ProcessorData(hostName, processorIdStr));
106
107 List<String> children = zkUtils.getSortedActiveProcessorsZnodes();
108 LOG.debug(zLog("Current active processors - " + children));
109 int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));
110
111 LOG.info("tryBecomeLeader: index = " + index + " for path=" + currentPath + " out of " + Arrays.toString(children.toArray()));
112 if (children.size() == 0 || index == -1) {
113 throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!");
114 }
115
116 if (index == 0) {
117 isLeader.getAndSet(true);
118 LOG.info(zLog("Eligible to become the leader!"));
119 if (leaderElectorListener != null) {
120 leaderElectorListener.onBecomingLeader();
121 }
122 return;
123 }
124
125 isLeader.getAndSet(false);
126 LOG.info("Index = " + index + " Not eligible to be a leader yet!");
127 String predecessor = children.get(index - 1);
128 if (!predecessor.equals(currentSubscription)) {
129 if (currentSubscription != null) {
130 LOG.debug(zLog("Unsubscribing data change for " + currentSubscription));
131 zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
132 previousProcessorChangeListener);
133 previousProcessorChangeListener = new PreviousProcessorChangeListener(zkUtils);
134 }
135 currentSubscription = predecessor;
136 LOG.info(zLog("Subscribing data change for " + predecessor));
137 zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
138 previousProcessorChangeListener);
139 }
140 /**
141 * Verify that the predecessor still exists. This step is needed because the ZkClient subscribes for data changes
142 * on the path, even if the path doesn't exist. Since we are using Ephemeral Sequential nodes, if the path doesn't
143 * exist during subscription, it is not going to get created in the future.
144 */
145 boolean predecessorExists = zkUtils.exists(keyBuilder.getProcessorsPath() + "/" + currentSubscription);
146 if (predecessorExists) {
147 LOG.info(zLog("Predecessor still exists. Current subscription is valid. Continuing as non-leader."));
148 } else {
149 try {
150 Thread.sleep(random.nextInt(1000));
151 } catch (InterruptedException e) {
152 Thread.interrupted();
153 }
154 LOG.info(zLog("Predecessor doesn't exist anymore. Trying to become leader again..."));
155 tryBecomeLeader();
156 }
157 }
158
159 @Override
160 public void resignLeadership() {
161 isLeader.compareAndSet(true, false);
162 }
163
164 @Override
165 public boolean amILeader() {
166 return isLeader.get();
167 }
168
169 private String zLog(String logMessage) {
170 return String.format("[Processor-%s] %s", processorIdStr, logMessage);
171 }
172
173 // Only by non-leaders
174 class PreviousProcessorChangeListener extends ZkUtils.GenIZkDataListener {
175
176 public PreviousProcessorChangeListener(ZkUtils zkUtils) {
177 super(zkUtils, "PreviousProcessorChangeListener");
178 }
179 @Override
180 public void handleDataChange(String dataPath, Object data) throws Exception {
181 LOG.debug("Data change on path: " + dataPath + " Data: " + data);
182 if (notAValidEvent())
183 return;
184 }
185
186 @Override
187 public void handleDataDeleted(String dataPath)
188 throws Exception {
189 LOG.info(zLog("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader again..."));
190 if (notAValidEvent()) {
191 return;
192 }
193 tryBecomeLeader();
194 }
195 }
196 }