[HELIX-350]cluster status monitor should not be reset in FINALIZE type pipeline updat...
[helix.git] / helix-core / src / test / java / org / apache / helix / ZkTestHelper.java
1 package org.apache.helix;
2
3 /*
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
19 * under the License.
20 */
21
22 import java.io.BufferedReader;
23 import java.io.IOException;
24 import java.io.InputStreamReader;
25 import java.io.PrintWriter;
26 import java.net.Socket;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.TreeMap;
33 import java.util.TreeSet;
34 import java.util.concurrent.BlockingQueue;
35 import java.util.concurrent.CountDownLatch;
36
37 import org.I0Itec.zkclient.IZkChildListener;
38 import org.I0Itec.zkclient.IZkDataListener;
39 import org.I0Itec.zkclient.IZkStateListener;
40 import org.I0Itec.zkclient.ZkConnection;
41 import org.apache.helix.PropertyKey.Builder;
42 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
43 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
44 import org.apache.helix.manager.zk.ZkClient;
45 import org.apache.helix.model.ExternalView;
46 import org.apache.log4j.Logger;
47 import org.apache.zookeeper.WatchedEvent;
48 import org.apache.zookeeper.Watcher;
49 import org.apache.zookeeper.Watcher.Event.EventType;
50 import org.apache.zookeeper.Watcher.Event.KeeperState;
51 import org.apache.zookeeper.ZooKeeper;
52 import org.apache.zookeeper.ZooKeeper.States;
53 import org.testng.Assert;
54
55 public class ZkTestHelper {
56 private static Logger LOG = Logger.getLogger(ZkTestHelper.class);
57
58 static {
59 // Logger.getRootLogger().setLevel(Level.DEBUG);
60 }
61
62 public static void disconnectSession(final ZkClient zkClient) throws Exception {
63 IZkStateListener listener = new IZkStateListener() {
64 @Override
65 public void handleStateChanged(KeeperState state) throws Exception {
66 // System.err.println("disconnectSession handleStateChanged. state: " + state);
67 }
68
69 @Override
70 public void handleNewSession() throws Exception {
71 // make sure zkclient is connected again
72 zkClient.waitUntilConnected();
73
74 ZkConnection connection = ((ZkConnection) zkClient.getConnection());
75 ZooKeeper curZookeeper = connection.getZookeeper();
76
77 LOG.info("handleNewSession. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
78 }
79 };
80
81 zkClient.subscribeStateChanges(listener);
82 ZkConnection connection = ((ZkConnection) zkClient.getConnection());
83 ZooKeeper curZookeeper = connection.getZookeeper();
84 LOG.info("Before expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
85
86 Watcher watcher = new Watcher() {
87 @Override
88 public void process(WatchedEvent event) {
89 LOG.info("Process watchEvent: " + event);
90 }
91 };
92
93 final ZooKeeper dupZookeeper =
94 new ZooKeeper(connection.getServers(), curZookeeper.getSessionTimeout(), watcher,
95 curZookeeper.getSessionId(), curZookeeper.getSessionPasswd());
96 // wait until connected, then close
97 while (dupZookeeper.getState() != States.CONNECTED) {
98 Thread.sleep(10);
99 }
100 dupZookeeper.close();
101
102 connection = (ZkConnection) zkClient.getConnection();
103 curZookeeper = connection.getZookeeper();
104 zkClient.unsubscribeStateChanges(listener);
105
106 // System.err.println("zk: " + oldZookeeper);
107 LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
108 }
109
110 /**
111 * Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
112 */
113 public static void simulateZkStateDisconnected(ZkClient client) {
114 WatchedEvent event = new WatchedEvent(EventType.None, KeeperState.Disconnected, null);
115 client.process(event);
116 }
117
118 /**
119 * Get zk connection session id
120 * @param client
121 * @return
122 */
123 public static String getSessionId(ZkClient client) {
124 ZkConnection connection = ((ZkConnection) client.getConnection());
125 ZooKeeper curZookeeper = connection.getZookeeper();
126 return Long.toHexString(curZookeeper.getSessionId());
127 }
128
129 /**
130 * Expire current zk session and wait for {@link IZkStateListener#handleNewSession()} invoked
131 * @param zkClient
132 * @throws Exception
133 */
134 public static void expireSession(final ZkClient zkClient) throws Exception {
135 final CountDownLatch waitNewSession = new CountDownLatch(1);
136
137 IZkStateListener listener = new IZkStateListener() {
138 @Override
139 public void handleStateChanged(KeeperState state) throws Exception {
140 LOG.info("IZkStateListener#handleStateChanged, state: " + state);
141 }
142
143 @Override
144 public void handleNewSession() throws Exception {
145 // make sure zkclient is connected again
146 zkClient.waitUntilConnected();
147
148 ZkConnection connection = ((ZkConnection) zkClient.getConnection());
149 ZooKeeper curZookeeper = connection.getZookeeper();
150
151 LOG.info("handleNewSession. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
152 waitNewSession.countDown();
153 }
154 };
155
156 zkClient.subscribeStateChanges(listener);
157
158 ZkConnection connection = ((ZkConnection) zkClient.getConnection());
159 ZooKeeper curZookeeper = connection.getZookeeper();
160 String oldSessionId = Long.toHexString(curZookeeper.getSessionId());
161 LOG.info("Before session expiry. sessionId: " + oldSessionId + ", zk: " + curZookeeper);
162
163 Watcher watcher = new Watcher() {
164 @Override
165 public void process(WatchedEvent event) {
166 LOG.info("Watcher#process, event: " + event);
167 }
168 };
169
170 final ZooKeeper dupZookeeper =
171 new ZooKeeper(connection.getServers(), curZookeeper.getSessionTimeout(), watcher,
172 curZookeeper.getSessionId(), curZookeeper.getSessionPasswd());
173 // wait until connected, then close
174 while (dupZookeeper.getState() != States.CONNECTED) {
175 Thread.sleep(10);
176 }
177 Assert.assertEquals(dupZookeeper.getState(), States.CONNECTED, "Fail to connect to zk using current session info");
178 dupZookeeper.close();
179
180 // make sure session expiry really happens
181 waitNewSession.await();
182 zkClient.unsubscribeStateChanges(listener);
183
184 connection = (ZkConnection) zkClient.getConnection();
185 curZookeeper = connection.getZookeeper();
186
187 String newSessionId = Long.toHexString(curZookeeper.getSessionId());
188 LOG.info("After session expiry. sessionId: " + newSessionId + ", zk: " + curZookeeper);
189 Assert.assertNotSame(newSessionId, oldSessionId, "Fail to expire current session, zk: " + curZookeeper);
190 }
191
192 /**
193 * expire zk session asynchronously
194 * @param zkClient
195 * @throws Exception
196 */
197 public static void asyncExpireSession(final ZkClient zkClient) throws Exception {
198 ZkConnection connection = ((ZkConnection) zkClient.getConnection());
199 ZooKeeper curZookeeper = connection.getZookeeper();
200 LOG.info("Before expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
201
202 Watcher watcher = new Watcher() {
203 @Override
204 public void process(WatchedEvent event) {
205 LOG.info("Process watchEvent: " + event);
206 }
207 };
208
209 final ZooKeeper dupZookeeper =
210 new ZooKeeper(connection.getServers(), curZookeeper.getSessionTimeout(), watcher,
211 curZookeeper.getSessionId(), curZookeeper.getSessionPasswd());
212 // wait until connected, then close
213 while (dupZookeeper.getState() != States.CONNECTED) {
214 Thread.sleep(10);
215 }
216 dupZookeeper.close();
217
218 connection = (ZkConnection) zkClient.getConnection();
219 curZookeeper = connection.getZookeeper();
220
221 // System.err.println("zk: " + oldZookeeper);
222 LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
223 }
224
225 /*
226 * stateMap: partition->instance->state
227 */
228 public static boolean verifyState(ZkClient zkclient, String clusterName, String resourceName,
229 Map<String, Map<String, String>> expectStateMap, String op) {
230 boolean result = true;
231 ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
232 ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
233 Builder keyBuilder = accessor.keyBuilder();
234
235 ExternalView extView = accessor.getProperty(keyBuilder.externalView(resourceName));
236 Map<String, Map<String, String>> actualStateMap = extView.getRecord().getMapFields();
237 for (String partition : actualStateMap.keySet()) {
238 for (String expectPartiton : expectStateMap.keySet()) {
239 if (!partition.matches(expectPartiton)) {
240 continue;
241 }
242
243 Map<String, String> actualInstanceStateMap = actualStateMap.get(partition);
244 Map<String, String> expectInstanceStateMap = expectStateMap.get(expectPartiton);
245 for (String instance : actualInstanceStateMap.keySet()) {
246 for (String expectInstance : expectStateMap.get(expectPartiton).keySet()) {
247 if (!instance.matches(expectInstance)) {
248 continue;
249 }
250
251 String actualState = actualInstanceStateMap.get(instance);
252 String expectState = expectInstanceStateMap.get(expectInstance);
253 boolean equals = expectState.equals(actualState);
254 if (op.equals("==") && !equals || op.equals("!=") && equals) {
255 System.out.println(partition + "/" + instance + " state mismatch. actual state: "
256 + actualState + ", but expect: " + expectState + ", op: " + op);
257 result = false;
258 }
259 }
260 }
261 }
262 }
263 return result;
264 }
265
266 /**
267 * return the number of listeners on given zk-path
268 * @param zkAddr
269 * @param path
270 * @return
271 * @throws Exception
272 */
273 public static int numberOfListeners(String zkAddr, String path) throws Exception {
274 Map<String, Set<String>> listenerMap = getListenersByZkPath(zkAddr);
275 if (listenerMap.containsKey(path)) {
276 return listenerMap.get(path).size();
277 }
278 return 0;
279 }
280
281 /**
282 * return a map from zk-path to a set of zk-session-id that put watches on the zk-path
283 * @param zkAddr
284 * @return
285 * @throws Exception
286 */
287 public static Map<String, Set<String>> getListenersByZkPath(String zkAddr) throws Exception {
288 String splits[] = zkAddr.split(":");
289 Map<String, Set<String>> listenerMap = new TreeMap<String, Set<String>>();
290 Socket sock = null;
291 int retry = 5;
292
293 while (retry > 0) {
294 try {
295 sock = new Socket(splits[0], Integer.parseInt(splits[1]));
296 PrintWriter out = new PrintWriter(sock.getOutputStream(), true);
297 BufferedReader in = new BufferedReader(new InputStreamReader(sock.getInputStream()));
298
299 out.println("wchp");
300
301 listenerMap.clear();
302 String lastPath = null;
303 String line = in.readLine();
304 while (line != null) {
305 line = line.trim();
306
307 if (line.startsWith("/")) {
308 lastPath = line;
309 if (!listenerMap.containsKey(lastPath)) {
310 listenerMap.put(lastPath, new TreeSet<String>());
311 }
312 } else if (line.startsWith("0x")) {
313 if (lastPath != null && listenerMap.containsKey(lastPath)) {
314 listenerMap.get(lastPath).add(line);
315 } else {
316 LOG.error("Not path associated with listener sessionId: " + line + ", lastPath: "
317 + lastPath);
318 }
319 } else {
320 // LOG.error("unrecognized line: " + line);
321 }
322 line = in.readLine();
323 }
324 break;
325 } catch (Exception e) {
326 // sometimes in test, we see connection-reset exceptions when in.readLine()
327 // so add this retry logic
328 retry--;
329 } finally {
330 if (sock != null)
331 sock.close();
332 }
333 }
334 return listenerMap;
335 }
336
337 /**
338 * return a map from session-id to a set of zk-path that the session has watches on
339 * @return
340 */
341 public static Map<String, Set<String>> getListenersBySession(String zkAddr) throws Exception {
342 Map<String, Set<String>> listenerMapByInstance = getListenersByZkPath(zkAddr);
343
344 // convert to index by sessionId
345 Map<String, Set<String>> listenerMapBySession = new TreeMap<String, Set<String>>();
346 for (String path : listenerMapByInstance.keySet()) {
347 for (String sessionId : listenerMapByInstance.get(path)) {
348 if (!listenerMapBySession.containsKey(sessionId)) {
349 listenerMapBySession.put(sessionId, new TreeSet<String>());
350 }
351 listenerMapBySession.get(sessionId).add(path);
352 }
353 }
354
355 return listenerMapBySession;
356 }
357
358 static java.lang.reflect.Field getField(Class<?> clazz, String fieldName)
359 throws NoSuchFieldException {
360 try {
361 return clazz.getDeclaredField(fieldName);
362 } catch (NoSuchFieldException e) {
363 Class<?> superClass = clazz.getSuperclass();
364 if (superClass == null) {
365 throw e;
366 } else {
367 return getField(superClass, fieldName);
368 }
369 }
370 }
371
372 public static Map<String, List<String>> getZkWatch(ZkClient client) throws Exception {
373 Map<String, List<String>> lists = new HashMap<String, List<String>>();
374 ZkConnection connection = ((ZkConnection) client.getConnection());
375 ZooKeeper zk = connection.getZookeeper();
376
377 java.lang.reflect.Field field = getField(zk.getClass(), "watchManager");
378 field.setAccessible(true);
379 Object watchManager = field.get(zk);
380
381 java.lang.reflect.Field field2 = getField(watchManager.getClass(), "dataWatches");
382 field2.setAccessible(true);
383 HashMap<String, Set<Watcher>> dataWatches =
384 (HashMap<String, Set<Watcher>>) field2.get(watchManager);
385
386 field2 = getField(watchManager.getClass(), "existWatches");
387 field2.setAccessible(true);
388 HashMap<String, Set<Watcher>> existWatches =
389 (HashMap<String, Set<Watcher>>) field2.get(watchManager);
390
391 field2 = getField(watchManager.getClass(), "childWatches");
392 field2.setAccessible(true);
393 HashMap<String, Set<Watcher>> childWatches =
394 (HashMap<String, Set<Watcher>>) field2.get(watchManager);
395
396 lists.put("dataWatches", new ArrayList<String>(dataWatches.keySet()));
397 lists.put("existWatches", new ArrayList<String>(existWatches.keySet()));
398 lists.put("childWatches", new ArrayList<String>(childWatches.keySet()));
399
400 return lists;
401 }
402
403 public static Map<String, Set<IZkDataListener>> getZkDataListener(ZkClient client)
404 throws Exception {
405 java.lang.reflect.Field field = getField(client.getClass(), "_dataListener");
406 field.setAccessible(true);
407 Map<String, Set<IZkDataListener>> dataListener =
408 (Map<String, Set<IZkDataListener>>) field.get(client);
409 return dataListener;
410 }
411
412 public static Map<String, Set<IZkChildListener>> getZkChildListener(ZkClient client)
413 throws Exception {
414 java.lang.reflect.Field field = getField(client.getClass(), "_childListener");
415 field.setAccessible(true);
416 Map<String, Set<IZkChildListener>> childListener =
417 (Map<String, Set<IZkChildListener>>) field.get(client);
418 return childListener;
419 }
420
421 public static boolean tryWaitZkEventsCleaned(ZkClient zkclient) throws Exception {
422 java.lang.reflect.Field field = getField(zkclient.getClass(), "_eventThread");
423 field.setAccessible(true);
424 Object eventThread = field.get(zkclient);
425 // System.out.println("field: " + eventThread);
426
427 java.lang.reflect.Field field2 = getField(eventThread.getClass(), "_events");
428 field2.setAccessible(true);
429 BlockingQueue queue = (BlockingQueue) field2.get(eventThread);
430 // System.out.println("field2: " + queue + ", " + queue.size());
431
432 if (queue == null) {
433 LOG.error("fail to get event-queue from zkclient. skip waiting");
434 return false;
435 }
436
437 for (int i = 0; i < 20; i++) {
438 if (queue.size() == 0) {
439 return true;
440 }
441 Thread.sleep(100);
442 System.out.println("pending zk-events in queue: " + queue);
443 }
444 return false;
445 }
446 }

Copyright 2016, The Apache Software Foundation.