ecf118bed37d45756ef89a27fae5544084d9df9f
[samza.git] / samza-core / src / main / java / org / apache / samza / zk / ZkUtils.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package org.apache.samza.zk;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.concurrent.TimeUnit;
27 import org.I0Itec.zkclient.IZkChildListener;
28 import org.I0Itec.zkclient.IZkDataListener;
29 import org.I0Itec.zkclient.ZkClient;
30 import org.I0Itec.zkclient.ZkConnection;
31 import org.I0Itec.zkclient.exception.ZkInterruptedException;
32 import org.apache.samza.SamzaException;
33 import org.apache.samza.job.model.JobModel;
34 import org.apache.samza.metrics.MetricsRegistry;
35 import org.apache.samza.serializers.model.SamzaObjectMapper;
36 import org.apache.zookeeper.data.Stat;
37 import org.codehaus.jackson.map.ObjectMapper;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42 * Util class to help manage Zk connection and ZkClient.
43 * It also provides additional utility methods for read/write/subscribe/unsubscribe access to the ZK tree.
44 *
45 * <p>
46 * <b>Note on ZkClient:</b>
47 * {@link ZkClient} consists of two threads - I/O thread and Event thread.
48 * I/O thread manages heartbeats to the Zookeeper server in the ensemble and handles responses to synchronous methods
49 * in Zookeeper API.
50 * Event thread typically receives all the Watcher events and delivers to registered listeners. It, also, handles
51 * responses to asynchronous methods in Zookeeper API.
52 * </p>
53 *
54 * <p>
55 * <b>Note on Session Management:</b>
56 * Session management, if needed, should be handled by the caller. This can be done by implementing
57 * {@link org.I0Itec.zkclient.IZkStateListener} and subscribing this listener to the current ZkClient. Note: The connection state change
58 * callbacks are invoked in the context of the Event thread of the ZkClient. So, it is advised to do non-blocking
59 * processing in the callbacks.
60 * </p>
61 */
62 public class ZkUtils {
63 private static final Logger LOG = LoggerFactory.getLogger(ZkUtils.class);
64
65 private final ZkClient zkClient;
66 private volatile String ephemeralPath = null;
67 private final ZkKeyBuilder keyBuilder;
68 private final int connectionTimeoutMs;
69 private final ZkUtilsMetrics metrics;
70
71 public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs, MetricsRegistry metricsRegistry) {
72 this.keyBuilder = zkKeyBuilder;
73 this.connectionTimeoutMs = connectionTimeoutMs;
74 this.zkClient = zkClient;
75 this.metrics = new ZkUtilsMetrics(metricsRegistry);
76 }
77
78 public void connect() throws ZkInterruptedException {
79 boolean isConnected = zkClient.waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS);
80 if (!isConnected) {
81 metrics.zkConnectionError.inc();
82 throw new RuntimeException("Unable to connect to Zookeeper within connectionTimeout " + connectionTimeoutMs + "ms. Shutting down!");
83 }
84 }
85
86 public static ZkConnection createZkConnection(String zkConnectString, int sessionTimeoutMs) {
87 return new ZkConnection(zkConnectString, sessionTimeoutMs);
88 }
89
90 ZkClient getZkClient() {
91 return zkClient;
92 }
93
94 public ZkKeyBuilder getKeyBuilder() {
95 return keyBuilder;
96 }
97
98 /**
99 * Returns a ZK generated identifier for this client.
100 * If the current client is registering for the first time, it creates an ephemeral sequential node in the ZK tree
101 * If the current client has already registered and is still within the same session, it returns the already existing
102 * value for the ephemeralPath
103 *
104 * @param data Object that should be written as data in the registered ephemeral ZK node
105 * @return String representing the absolute ephemeralPath of this client in the current session
106 */
107 public synchronized String registerProcessorAndGetId(final ProcessorData data) {
108 if (ephemeralPath == null) {
109 ephemeralPath =
110 zkClient.createEphemeralSequential(
111 keyBuilder.getProcessorsPath() + "/", data.toString());
112
113 LOG.info("newly generated path for " + data + " is " + ephemeralPath);
114 return ephemeralPath;
115 } else {
116 LOG.info("existing path for " + data + " is " + ephemeralPath);
117 return ephemeralPath;
118 }
119 }
120
121 /**
122 * Method is used to get the <i>sorted</i> list of currently active/registered processors (znodes)
123 *
124 * @return List of absolute ZK node paths
125 */
126 public List<String> getSortedActiveProcessorsZnodes() {
127 List<String> znodeIds = zkClient.getChildren(keyBuilder.getProcessorsPath());
128 if (znodeIds.size() > 0) {
129 Collections.sort(znodeIds);
130 LOG.info("Found these children - " + znodeIds);
131 }
132 return znodeIds;
133 }
134
135 /**
136 * Method is used to read processor's data from the znode
137 * @param fullPath absolute path to the znode
138 * @return processor's data
139 * @throws SamzaException when fullPath doesn't exist in zookeeper
140 * or problems with connecting to zookeeper.
141 */
142 String readProcessorData(String fullPath) {
143 try {
144 String data = zkClient.readData(fullPath, false);
145 metrics.reads.inc();
146 return data;
147 } catch (Exception e) {
148 throw new SamzaException(String.format("Cannot read ZK node: %s", fullPath), e);
149 }
150 }
151
152 /**
153 * Method is used to get the list of currently active/registered processor ids
154 * @return List of processorIds
155 */
156 public List<String> getSortedActiveProcessorsIDs() {
157 return getActiveProcessorsIDs(getSortedActiveProcessorsZnodes());
158 }
159
160 /**
161 * Method is used to get the <i>sorted</i> list of processors ids for a given list of znodes
162 * @param znodeIds - list of relative paths of the children's znodes
163 * @return List of processor ids for a given list of znodes
164 */
165 public List<String> getActiveProcessorsIDs(List<String> znodeIds) {
166 String processorPath = keyBuilder.getProcessorsPath();
167 List<String> processorIds = new ArrayList<>(znodeIds.size());
168 if (znodeIds.size() > 0) {
169
170 for (String child : znodeIds) {
171 String fullPath = String.format("%s/%s", processorPath, child);
172 processorIds.add(new ProcessorData(readProcessorData(fullPath)).getProcessorId());
173 }
174
175 LOG.info("Found these children - " + znodeIds);
176 LOG.info("Found these processorIds - " + processorIds);
177 }
178 return processorIds;
179 }
180
181 /* Wrapper for standard I0Itec methods */
182 public void unsubscribeDataChanges(String path, IZkDataListener dataListener) {
183 zkClient.unsubscribeDataChanges(path, dataListener);
184 }
185
186 public void subscribeDataChanges(String path, IZkDataListener dataListener) {
187 zkClient.subscribeDataChanges(path, dataListener);
188 metrics.subscriptions.inc();
189 }
190
191 public void subscribeChildChanges(String path, IZkChildListener listener) {
192 zkClient.subscribeChildChanges(path, listener);
193 metrics.subscriptions.inc();
194 }
195
196 public void unsubscribeChildChanges(String path, IZkChildListener childListener) {
197 zkClient.unsubscribeChildChanges(path, childListener);
198 }
199
200 public void writeData(String path, Object object) {
201 zkClient.writeData(path, object);
202 metrics.writes.inc();
203 }
204
205 public boolean exists(String path) {
206 return zkClient.exists(path);
207 }
208
209 public void close() throws ZkInterruptedException {
210 zkClient.close();
211 }
212
213 /**
214 * subscribe for changes of JobModel version
215 * @param dataListener describe this
216 */
217 public void subscribeToJobModelVersionChange(IZkDataListener dataListener) {
218 LOG.info(" subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath());
219 zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
220 metrics.subscriptions.inc();
221 }
222
223 /**
224 * Publishes new job model into ZK.
225 * This call should FAIL if the node already exists.
226 * @param jobModelVersion version of the jobModeL to publish
227 * @param jobModel jobModel to publish
228 *
229 */
230 public void publishJobModel(String jobModelVersion, JobModel jobModel) {
231 try {
232 ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
233 String jobModelStr = mmapper.writerWithDefaultPrettyPrinter().writeValueAsString(jobModel);
234 LOG.info("jobModelAsString=" + jobModelStr);
235 zkClient.createPersistent(keyBuilder.getJobModelPath(jobModelVersion), jobModelStr);
236 LOG.info("wrote jobModel path =" + keyBuilder.getJobModelPath(jobModelVersion));
237 } catch (Exception e) {
238 LOG.error("JobModel publish failed for version=" + jobModelVersion, e);
239 throw new SamzaException(e);
240 }
241 }
242
243 /**
244 * get the job model from ZK by version
245 * @param jobModelVersion jobModel version to get
246 * @return job model for this version
247 */
248 public JobModel getJobModel(String jobModelVersion) {
249 LOG.info("read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion));
250 Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion));
251 metrics.reads.inc();
252 ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
253 JobModel jm;
254 try {
255 jm = mmapper.readValue((String) data, JobModel.class);
256 } catch (IOException e) {
257 throw new SamzaException("failed to read JobModel from ZK", e);
258 }
259 return jm;
260 }
261
262 /**
263 * read the jobmodel version from ZK
264 * @return jobmodel version as a string
265 */
266 public String getJobModelVersion() {
267 String jobModelVersion = zkClient.readData(keyBuilder.getJobModelVersionPath());
268 metrics.reads.inc();
269 return jobModelVersion;
270 }
271
272 /**
273 * publish the version number of the next JobModel
274 * @param oldVersion - used to validate, that no one has changed the version in the meanwhile.
275 * @param newVersion - new version.
276 */
277 public void publishJobModelVersion(String oldVersion, String newVersion) {
278 Stat stat = new Stat();
279 String currentVersion = zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat);
280 metrics.reads.inc();
281 LOG.info("publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat
282 .getVersion() + ")");
283
284 if (currentVersion != null && !currentVersion.equals(oldVersion)) {
285 throw new SamzaException(
286 "Someone changed JobModelVersion while the leader was generating one: expected" + oldVersion + ", got " + currentVersion);
287 }
288 // data version is the ZK version of the data from the ZK.
289 int dataVersion = stat.getVersion();
290 try {
291 stat = zkClient.writeDataReturnStat(keyBuilder.getJobModelVersionPath(), newVersion, dataVersion);
292 metrics.writes.inc();
293 } catch (Exception e) {
294 String msg = "publish job model version failed for new version = " + newVersion + "; old version = " + oldVersion;
295 LOG.error(msg, e);
296 throw new SamzaException(msg, e);
297 }
298 LOG.info("published new version: " + newVersion + "; expected data version = " + (dataVersion + 1) +
299 "(actual data version after update = " + stat.getVersion() + ")");
300 }
301
302
303 /**
304 * verify that given paths exist in ZK
305 * @param paths - paths to verify or create
306 */
307 public void makeSurePersistentPathsExists(String[] paths) {
308 for (String path : paths) {
309 if (!zkClient.exists(path)) {
310 zkClient.createPersistent(path, true);
311 }
312 }
313 }
314
315 /**
316 * subscribe to the changes in the list of processors in ZK
317 * @param listener - will be called when a processor is added or removed.
318 */
319 public void subscribeToProcessorChange(IZkChildListener listener) {
320 LOG.info("subscribing for child change at:" + keyBuilder.getProcessorsPath());
321 zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener);
322 metrics.subscriptions.inc();
323 }
324 }