[HELIX-350]cluster status monitor should not be reset in FINALIZE type pipeline updat...
[helix.git] / helix-core / src / test / java / org / apache / helix / ZkTestHelper.java
1 package org.apache.helix;
2
3 /*
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements.  See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership.  The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License.  You may obtain a copy of the License at
11  *
12  *   http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing,
15  * software distributed under the License is distributed on an
16  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17  * KIND, either express or implied.  See the License for the
18  * specific language governing permissions and limitations
19  * under the License.
20  */
21
22 import java.io.BufferedReader;
23 import java.io.IOException;
24 import java.io.InputStreamReader;
25 import java.io.PrintWriter;
26 import java.net.Socket;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.TreeMap;
33 import java.util.TreeSet;
34 import java.util.concurrent.BlockingQueue;
35 import java.util.concurrent.CountDownLatch;
36
37 import org.I0Itec.zkclient.IZkChildListener;
38 import org.I0Itec.zkclient.IZkDataListener;
39 import org.I0Itec.zkclient.IZkStateListener;
40 import org.I0Itec.zkclient.ZkConnection;
41 import org.apache.helix.PropertyKey.Builder;
42 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
43 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
44 import org.apache.helix.manager.zk.ZkClient;
45 import org.apache.helix.model.ExternalView;
46 import org.apache.log4j.Logger;
47 import org.apache.zookeeper.WatchedEvent;
48 import org.apache.zookeeper.Watcher;
49 import org.apache.zookeeper.Watcher.Event.EventType;
50 import org.apache.zookeeper.Watcher.Event.KeeperState;
51 import org.apache.zookeeper.ZooKeeper;
52 import org.apache.zookeeper.ZooKeeper.States;
53 import org.testng.Assert;
54
55 public class ZkTestHelper {
56   private static Logger LOG = Logger.getLogger(ZkTestHelper.class);
57
58   static {
59     // Logger.getRootLogger().setLevel(Level.DEBUG);
60   }
61
62   public static void disconnectSession(final ZkClient zkClient) throws Exception {
63     IZkStateListener listener = new IZkStateListener() {
64       @Override
65       public void handleStateChanged(KeeperState state) throws Exception {
66         // System.err.println("disconnectSession handleStateChanged. state: " + state);
67       }
68
69       @Override
70       public void handleNewSession() throws Exception {
71         // make sure zkclient is connected again
72         zkClient.waitUntilConnected();
73
74         ZkConnection connection = ((ZkConnection) zkClient.getConnection());
75         ZooKeeper curZookeeper = connection.getZookeeper();
76
77         LOG.info("handleNewSession. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
78       }
79     };
80
81     zkClient.subscribeStateChanges(listener);
82     ZkConnection connection = ((ZkConnection) zkClient.getConnection());
83     ZooKeeper curZookeeper = connection.getZookeeper();
84     LOG.info("Before expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
85
86     Watcher watcher = new Watcher() {
87       @Override
88       public void process(WatchedEvent event) {
89         LOG.info("Process watchEvent: " + event);
90       }
91     };
92
93     final ZooKeeper dupZookeeper =
94         new ZooKeeper(connection.getServers(), curZookeeper.getSessionTimeout(), watcher,
95             curZookeeper.getSessionId(), curZookeeper.getSessionPasswd());
96     // wait until connected, then close
97     while (dupZookeeper.getState() != States.CONNECTED) {
98       Thread.sleep(10);
99     }
100     dupZookeeper.close();
101
102     connection = (ZkConnection) zkClient.getConnection();
103     curZookeeper = connection.getZookeeper();
104     zkClient.unsubscribeStateChanges(listener);
105
106     // System.err.println("zk: " + oldZookeeper);
107     LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
108   }
109
110   /**
111    * Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
112    */
113   public static void simulateZkStateDisconnected(ZkClient client) {
114     WatchedEvent event = new WatchedEvent(EventType.None, KeeperState.Disconnected, null);
115     client.process(event);
116   }
117
118   /**
119    * Get zk connection session id
120    * @param client
121    * @return
122    */
123   public static String getSessionId(ZkClient client) {
124     ZkConnection connection = ((ZkConnection) client.getConnection());
125     ZooKeeper curZookeeper = connection.getZookeeper();
126     return Long.toHexString(curZookeeper.getSessionId());
127   }
128
129   /**
130    * Expire current zk session and wait for {@link IZkStateListener#handleNewSession()} invoked
131    * @param zkClient
132    * @throws Exception
133    */
134   public static void expireSession(final ZkClient zkClient) throws Exception {
135     final CountDownLatch waitNewSession = new CountDownLatch(1);
136
137     IZkStateListener listener = new IZkStateListener() {
138       @Override
139       public void handleStateChanged(KeeperState state) throws Exception {
140         LOG.info("IZkStateListener#handleStateChanged, state: " + state);
141       }
142
143       @Override
144       public void handleNewSession() throws Exception {
145         // make sure zkclient is connected again
146         zkClient.waitUntilConnected();
147
148         ZkConnection connection = ((ZkConnection) zkClient.getConnection());
149         ZooKeeper curZookeeper = connection.getZookeeper();
150
151         LOG.info("handleNewSession. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
152         waitNewSession.countDown();
153       }
154     };
155
156     zkClient.subscribeStateChanges(listener);
157
158     ZkConnection connection = ((ZkConnection) zkClient.getConnection());
159     ZooKeeper curZookeeper = connection.getZookeeper();
160     String oldSessionId = Long.toHexString(curZookeeper.getSessionId());
161     LOG.info("Before session expiry. sessionId: " + oldSessionId + ", zk: " + curZookeeper);
162
163     Watcher watcher = new Watcher() {
164       @Override
165       public void process(WatchedEvent event) {
166         LOG.info("Watcher#process, event: " + event);
167       }
168     };
169
170     final ZooKeeper dupZookeeper =
171         new ZooKeeper(connection.getServers(), curZookeeper.getSessionTimeout(), watcher,
172             curZookeeper.getSessionId(), curZookeeper.getSessionPasswd());
173     // wait until connected, then close
174     while (dupZookeeper.getState() != States.CONNECTED) {
175       Thread.sleep(10);
176     }
177     Assert.assertEquals(dupZookeeper.getState(), States.CONNECTED, "Fail to connect to zk using current session info");
178     dupZookeeper.close();
179
180     // make sure session expiry really happens
181     waitNewSession.await();
182     zkClient.unsubscribeStateChanges(listener);
183
184     connection = (ZkConnection) zkClient.getConnection();
185     curZookeeper = connection.getZookeeper();
186
187     String newSessionId = Long.toHexString(curZookeeper.getSessionId());
188     LOG.info("After session expiry. sessionId: " + newSessionId + ", zk: " + curZookeeper);
189     Assert.assertNotSame(newSessionId, oldSessionId, "Fail to expire current session, zk: " + curZookeeper);
190   }
191
192   /**
193    * expire zk session asynchronously
194    * @param zkClient
195    * @throws Exception
196    */
197   public static void asyncExpireSession(final ZkClient zkClient) throws Exception {
198     ZkConnection connection = ((ZkConnection) zkClient.getConnection());
199     ZooKeeper curZookeeper = connection.getZookeeper();
200     LOG.info("Before expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
201
202     Watcher watcher = new Watcher() {
203       @Override
204       public void process(WatchedEvent event) {
205         LOG.info("Process watchEvent: " + event);
206       }
207     };
208
209     final ZooKeeper dupZookeeper =
210         new ZooKeeper(connection.getServers(), curZookeeper.getSessionTimeout(), watcher,
211             curZookeeper.getSessionId(), curZookeeper.getSessionPasswd());
212     // wait until connected, then close
213     while (dupZookeeper.getState() != States.CONNECTED) {
214       Thread.sleep(10);
215     }
216     dupZookeeper.close();
217
218     connection = (ZkConnection) zkClient.getConnection();
219     curZookeeper = connection.getZookeeper();
220
221     // System.err.println("zk: " + oldZookeeper);
222     LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
223   }
224
225   /*
226    * stateMap: partition->instance->state
227    */
228   public static boolean verifyState(ZkClient zkclient, String clusterName, String resourceName,
229       Map<String, Map<String, String>> expectStateMap, String op) {
230     boolean result = true;
231     ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
232     ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
233     Builder keyBuilder = accessor.keyBuilder();
234
235     ExternalView extView = accessor.getProperty(keyBuilder.externalView(resourceName));
236     Map<String, Map<String, String>> actualStateMap = extView.getRecord().getMapFields();
237     for (String partition : actualStateMap.keySet()) {
238       for (String expectPartiton : expectStateMap.keySet()) {
239         if (!partition.matches(expectPartiton)) {
240           continue;
241         }
242
243         Map<String, String> actualInstanceStateMap = actualStateMap.get(partition);
244         Map<String, String> expectInstanceStateMap = expectStateMap.get(expectPartiton);
245         for (String instance : actualInstanceStateMap.keySet()) {
246           for (String expectInstance : expectStateMap.get(expectPartiton).keySet()) {
247             if (!instance.matches(expectInstance)) {
248               continue;
249             }
250
251             String actualState = actualInstanceStateMap.get(instance);
252             String expectState = expectInstanceStateMap.get(expectInstance);
253             boolean equals = expectState.equals(actualState);
254             if (op.equals("==") && !equals || op.equals("!=") && equals) {
255               System.out.println(partition + "/" + instance + " state mismatch. actual state: "
256                   + actualState + ", but expect: " + expectState + ", op: " + op);
257               result = false;
258             }
259           }
260         }
261       }
262     }
263     return result;
264   }
265
266   /**
267    * return the number of listeners on given zk-path
268    * @param zkAddr
269    * @param path
270    * @return
271    * @throws Exception
272    */
273   public static int numberOfListeners(String zkAddr, String path) throws Exception {
274     Map<String, Set<String>> listenerMap = getListenersByZkPath(zkAddr);
275     if (listenerMap.containsKey(path)) {
276       return listenerMap.get(path).size();
277     }
278     return 0;
279   }
280
281   /**
282    * return a map from zk-path to a set of zk-session-id that put watches on the zk-path
283    * @param zkAddr
284    * @return
285    * @throws Exception
286    */
287   public static Map<String, Set<String>> getListenersByZkPath(String zkAddr) throws Exception {
288     String splits[] = zkAddr.split(":");
289     Map<String, Set<String>> listenerMap = new TreeMap<String, Set<String>>();
290     Socket sock = null;
291     int retry = 5;
292
293     while (retry > 0) {
294       try {
295         sock = new Socket(splits[0], Integer.parseInt(splits[1]));
296         PrintWriter out = new PrintWriter(sock.getOutputStream(), true);
297         BufferedReader in = new BufferedReader(new InputStreamReader(sock.getInputStream()));
298
299         out.println("wchp");
300
301         listenerMap.clear();
302         String lastPath = null;
303         String line = in.readLine();
304         while (line != null) {
305           line = line.trim();
306
307           if (line.startsWith("/")) {
308             lastPath = line;
309             if (!listenerMap.containsKey(lastPath)) {
310               listenerMap.put(lastPath, new TreeSet<String>());
311             }
312           } else if (line.startsWith("0x")) {
313             if (lastPath != null && listenerMap.containsKey(lastPath)) {
314               listenerMap.get(lastPath).add(line);
315             } else {
316               LOG.error("Not path associated with listener sessionId: " + line + ", lastPath: "
317                   + lastPath);
318             }
319           } else {
320             // LOG.error("unrecognized line: " + line);
321           }
322           line = in.readLine();
323         }
324         break;
325       } catch (Exception e) {
326         // sometimes in test, we see connection-reset exceptions when in.readLine()
327         // so add this retry logic
328         retry--;
329       } finally {
330         if (sock != null)
331           sock.close();
332       }
333     }
334     return listenerMap;
335   }
336
337   /**
338    * return a map from session-id to a set of zk-path that the session has watches on
339    * @return
340    */
341   public static Map<String, Set<String>> getListenersBySession(String zkAddr) throws Exception {
342     Map<String, Set<String>> listenerMapByInstance = getListenersByZkPath(zkAddr);
343
344     // convert to index by sessionId
345     Map<String, Set<String>> listenerMapBySession = new TreeMap<String, Set<String>>();
346     for (String path : listenerMapByInstance.keySet()) {
347       for (String sessionId : listenerMapByInstance.get(path)) {
348         if (!listenerMapBySession.containsKey(sessionId)) {
349           listenerMapBySession.put(sessionId, new TreeSet<String>());
350         }
351         listenerMapBySession.get(sessionId).add(path);
352       }
353     }
354
355     return listenerMapBySession;
356   }
357
358   static java.lang.reflect.Field getField(Class<?> clazz, String fieldName)
359       throws NoSuchFieldException {
360     try {
361       return clazz.getDeclaredField(fieldName);
362     } catch (NoSuchFieldException e) {
363       Class<?> superClass = clazz.getSuperclass();
364       if (superClass == null) {
365         throw e;
366       } else {
367         return getField(superClass, fieldName);
368       }
369     }
370   }
371
372   public static Map<String, List<String>> getZkWatch(ZkClient client) throws Exception {
373     Map<String, List<String>> lists = new HashMap<String, List<String>>();
374     ZkConnection connection = ((ZkConnection) client.getConnection());
375     ZooKeeper zk = connection.getZookeeper();
376
377     java.lang.reflect.Field field = getField(zk.getClass(), "watchManager");
378     field.setAccessible(true);
379     Object watchManager = field.get(zk);
380
381     java.lang.reflect.Field field2 = getField(watchManager.getClass(), "dataWatches");
382     field2.setAccessible(true);
383     HashMap<String, Set<Watcher>> dataWatches =
384         (HashMap<String, Set<Watcher>>) field2.get(watchManager);
385
386     field2 = getField(watchManager.getClass(), "existWatches");
387     field2.setAccessible(true);
388     HashMap<String, Set<Watcher>> existWatches =
389         (HashMap<String, Set<Watcher>>) field2.get(watchManager);
390
391     field2 = getField(watchManager.getClass(), "childWatches");
392     field2.setAccessible(true);
393     HashMap<String, Set<Watcher>> childWatches =
394         (HashMap<String, Set<Watcher>>) field2.get(watchManager);
395
396     lists.put("dataWatches", new ArrayList<String>(dataWatches.keySet()));
397     lists.put("existWatches", new ArrayList<String>(existWatches.keySet()));
398     lists.put("childWatches", new ArrayList<String>(childWatches.keySet()));
399
400     return lists;
401   }
402
403   public static Map<String, Set<IZkDataListener>> getZkDataListener(ZkClient client)
404       throws Exception {
405     java.lang.reflect.Field field = getField(client.getClass(), "_dataListener");
406     field.setAccessible(true);
407     Map<String, Set<IZkDataListener>> dataListener =
408         (Map<String, Set<IZkDataListener>>) field.get(client);
409     return dataListener;
410   }
411
412   public static Map<String, Set<IZkChildListener>> getZkChildListener(ZkClient client)
413       throws Exception {
414     java.lang.reflect.Field field = getField(client.getClass(), "_childListener");
415     field.setAccessible(true);
416     Map<String, Set<IZkChildListener>> childListener =
417         (Map<String, Set<IZkChildListener>>) field.get(client);
418     return childListener;
419   }
420
421   public static boolean tryWaitZkEventsCleaned(ZkClient zkclient) throws Exception {
422     java.lang.reflect.Field field = getField(zkclient.getClass(), "_eventThread");
423     field.setAccessible(true);
424     Object eventThread = field.get(zkclient);
425     // System.out.println("field: " + eventThread);
426
427     java.lang.reflect.Field field2 = getField(eventThread.getClass(), "_events");
428     field2.setAccessible(true);
429     BlockingQueue queue = (BlockingQueue) field2.get(eventThread);
430     // System.out.println("field2: " + queue + ", " + queue.size());
431
432     if (queue == null) {
433       LOG.error("fail to get event-queue from zkclient. skip waiting");
434       return false;
435     }
436
437     for (int i = 0; i < 20; i++) {
438       if (queue.size() == 0) {
439         return true;
440       }
441       Thread.sleep(100);
442       System.out.println("pending zk-events in queue: " + queue);
443     }
444     return false;
445   }
446 }