1981c9067a1e5c0981eccd9cb2c1c7820308ef74
[curator.git] / curator-client / src / main / java / org / apache / curator / CuratorZookeeperClient.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package org.apache.curator;
21
22 import com.google.common.base.Preconditions;
23 import org.apache.curator.connection.ConnectionHandlingPolicy;
24 import org.apache.curator.connection.StandardConnectionHandlingPolicy;
25 import org.apache.curator.drivers.OperationTrace;
26 import org.apache.curator.drivers.TracerDriver;
27 import org.apache.curator.ensemble.EnsembleProvider;
28 import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
29 import org.apache.curator.utils.DefaultTracerDriver;
30 import org.apache.curator.utils.DefaultZookeeperFactory;
31 import org.apache.curator.utils.ThreadUtils;
32 import org.apache.curator.utils.ZookeeperFactory;
33 import org.apache.zookeeper.WatchedEvent;
34 import org.apache.zookeeper.Watcher;
35 import org.apache.zookeeper.ZooKeeper;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38 import java.io.Closeable;
39 import java.io.IOException;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicBoolean;
43 import java.util.concurrent.atomic.AtomicReference;
44
45 /**
46 * A wrapper around Zookeeper that takes care of some low-level housekeeping
47 */
48 @SuppressWarnings("UnusedDeclaration")
49 public class CuratorZookeeperClient implements Closeable
50 {
51 private final Logger log = LoggerFactory.getLogger(getClass());
52 private final ConnectionState state;
53 private final AtomicReference<RetryPolicy> retryPolicy = new AtomicReference<RetryPolicy>();
54 private final int connectionTimeoutMs;
55 private final int waitForShutdownTimeoutMs;
56 private final AtomicBoolean started = new AtomicBoolean(false);
57 private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver());
58 private final ConnectionHandlingPolicy connectionHandlingPolicy;
59
60 /**
61 *
62 * @param connectString list of servers to connect to
63 * @param sessionTimeoutMs session timeout
64 * @param connectionTimeoutMs connection timeout
65 * @param watcher default watcher or null
66 * @param retryPolicy the retry policy to use
67 */
68 public CuratorZookeeperClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
69 {
70 this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, new StandardConnectionHandlingPolicy());
71 }
72
73 /**
74 * @param ensembleProvider the ensemble provider
75 * @param sessionTimeoutMs session timeout
76 * @param connectionTimeoutMs connection timeout
77 * @param watcher default watcher or null
78 * @param retryPolicy the retry policy to use
79 */
80 public CuratorZookeeperClient(EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
81 {
82 this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, new StandardConnectionHandlingPolicy());
83 }
84
85 /**
86 * @param zookeeperFactory factory for creating {@link ZooKeeper} instances
87 * @param ensembleProvider the ensemble provider
88 * @param sessionTimeoutMs session timeout
89 * @param connectionTimeoutMs connection timeout
90 * @param watcher default watcher or null
91 * @param retryPolicy the retry policy to use
92 * @param canBeReadOnly if true, allow ZooKeeper client to enter
93 * read only mode in case of a network partition. See
94 * {@link ZooKeeper#ZooKeeper(String, int, Watcher, long, byte[], boolean)}
95 * for details
96 */
97 public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly)
98 {
99 this(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, canBeReadOnly, new StandardConnectionHandlingPolicy());
100 }
101
102 /**
103 * @param zookeeperFactory factory for creating {@link ZooKeeper} instances
104 * @param ensembleProvider the ensemble provider
105 * @param sessionTimeoutMs session timeout
106 * @param connectionTimeoutMs connection timeout
107 * @param watcher default watcher or null
108 * @param retryPolicy the retry policy to use
109 * @param canBeReadOnly if true, allow ZooKeeper client to enter
110 * read only mode in case of a network partition. See
111 * {@link ZooKeeper#ZooKeeper(String, int, Watcher, long, byte[], boolean)}
112 * for details
113 * @param connectionHandlingPolicy connection handling policy - use one of the pre-defined policies or write your own
114 * @since 3.0.0
115 */
116 public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy) {
117 this(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, 0,
118 watcher, retryPolicy, canBeReadOnly, connectionHandlingPolicy);
119 }
120 /**
121 * @param zookeeperFactory factory for creating {@link ZooKeeper} instances
122 * @param ensembleProvider the ensemble provider
123 * @param sessionTimeoutMs session timeout
124 * @param connectionTimeoutMs connection timeout
125 * @param defaultWaitForShutdownTimeoutMs default timeout fo close operation
126 * @param watcher default watcher or null
127 * @param retryPolicy the retry policy to use
128 * @param canBeReadOnly if true, allow ZooKeeper client to enter
129 * read only mode in case of a network partition. See
130 * {@link ZooKeeper#ZooKeeper(String, int, Watcher, long, byte[], boolean)}
131 * for details
132 * @param connectionHandlingPolicy connection handling policy - use one of the pre-defined policies or write your own
133 * @since 4.0.2
134 */
135 public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
136 int sessionTimeoutMs, int connectionTimeoutMs, int defaultWaitForShutdownTimeoutMs, Watcher watcher,
137 RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
138 {
139 this.connectionHandlingPolicy = connectionHandlingPolicy;
140 if ( sessionTimeoutMs < connectionTimeoutMs )
141 {
142 log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
143 }
144
145 retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");
146 ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");
147
148 this.connectionTimeoutMs = connectionTimeoutMs;
149 this.waitForShutdownTimeoutMs = defaultWaitForShutdownTimeoutMs;
150 state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);
151 setRetryPolicy(retryPolicy);
152 }
153
154 /**
155 * Return the managed ZK instance.
156 *
157 * @return client the client
158 * @throws Exception if the connection timeout has elapsed or an exception occurs in a background process
159 */
160 public ZooKeeper getZooKeeper() throws Exception
161 {
162 Preconditions.checkState(started.get(), "Client is not started");
163
164 return state.getZooKeeper();
165 }
166
167 /**
168 * Return a new retry loop. All operations should be performed in a retry loop
169 *
170 * @return new retry loop
171 */
172 public RetryLoop newRetryLoop()
173 {
174 return new RetryLoop(retryPolicy.get(), tracer);
175 }
176
177 /**
178 * Return a new "session fail" retry loop. See {@link SessionFailRetryLoop} for details
179 * on when to use it.
180 *
181 * @param mode failure mode
182 * @return new retry loop
183 */
184 public SessionFailRetryLoop newSessionFailRetryLoop(SessionFailRetryLoop.Mode mode)
185 {
186 return new SessionFailRetryLoop(this, mode);
187 }
188
189 /**
190 * Returns true if the client is current connected
191 *
192 * @return true/false
193 */
194 public boolean isConnected()
195 {
196 return state.isConnected();
197 }
198
199 /**
200 * This method blocks until the connection to ZK succeeds. Use with caution. The block
201 * will timeout after the connection timeout (as passed to the constructor) has elapsed
202 *
203 * @return true if the connection succeeded, false if not
204 * @throws InterruptedException interrupted while waiting
205 */
206 public boolean blockUntilConnectedOrTimedOut() throws InterruptedException
207 {
208 Preconditions.checkState(started.get(), "Client is not started");
209
210 log.debug("blockUntilConnectedOrTimedOut() start");
211 OperationTrace trace = startAdvancedTracer("blockUntilConnectedOrTimedOut");
212
213 internalBlockUntilConnectedOrTimedOut();
214
215 trace.commit();
216
217 boolean localIsConnected = state.isConnected();
218 log.debug("blockUntilConnectedOrTimedOut() end. isConnected: " + localIsConnected);
219
220 return localIsConnected;
221 }
222
223 /**
224 * Must be called after construction
225 *
226 * @throws IOException errors
227 */
228 public void start() throws Exception
229 {
230 log.debug("Starting");
231
232 if ( !started.compareAndSet(false, true) )
233 {
234 throw new IllegalStateException("Already started");
235 }
236
237 state.start();
238 }
239
240 /**
241 * Close the client.
242 *
243 * Same as {@link #close(int) } using the timeout set at construction time.
244 *
245 * @see #close(int)
246 */
247 @Override
248 public void close() {
249 close(waitForShutdownTimeoutMs);
250 }
251
252 /**
253 * Close this client object as the {@link #close() } method.
254 * This method will wait for internal resources to be released.
255 *
256 * @param waitForShutdownTimeoutMs timeout (in milliseconds) to wait for resources to be released.
257 * Use zero or a negative value to skip the wait.
258 */
259 public void close(int waitForShutdownTimeoutMs)
260 {
261 log.debug("Closing, waitForShutdownTimeoutMs {}", waitForShutdownTimeoutMs);
262
263 started.set(false);
264 try
265 {
266 state.close(waitForShutdownTimeoutMs);
267 }
268 catch ( IOException e )
269 {
270 ThreadUtils.checkInterrupted(e);
271 log.error("", e);
272 }
273 }
274
275 /**
276 * Change the retry policy
277 *
278 * @param policy new policy
279 */
280 public void setRetryPolicy(RetryPolicy policy)
281 {
282 Preconditions.checkNotNull(policy, "policy cannot be null");
283
284 retryPolicy.set(policy);
285 }
286
287 /**
288 * Return the current retry policy
289 *
290 * @return policy
291 */
292 public RetryPolicy getRetryPolicy()
293 {
294 return retryPolicy.get();
295 }
296
297 /**
298 * Start a new tracer
299 * @param name name of the event
300 * @return the new tracer ({@link TimeTrace#commit()} must be called)
301 */
302 public TimeTrace startTracer(String name)
303 {
304 return new TimeTrace(name, tracer.get());
305 }
306
307 /**
308 * Start a new advanced tracer with more metrics being recorded
309 * @param name name of the event
310 * @return the new tracer ({@link OperationTrace#commit()} must be called)
311 */
312 public OperationTrace startAdvancedTracer(String name)
313 {
314 return new OperationTrace(name, tracer.get(), state.getSessionId());
315 }
316
317 /**
318 * Return the current tracing driver
319 *
320 * @return tracing driver
321 */
322 public TracerDriver getTracerDriver()
323 {
324 return tracer.get();
325 }
326
327 /**
328 * Change the tracing driver
329 *
330 * @param tracer new tracing driver
331 */
332 public void setTracerDriver(TracerDriver tracer)
333 {
334 this.tracer.set(tracer);
335 }
336
337 /**
338 * Returns the current known connection string - not guaranteed to be correct
339 * value at any point in the future.
340 *
341 * @return connection string
342 */
343 public String getCurrentConnectionString()
344 {
345 return state.getEnsembleProvider().getConnectionString();
346 }
347
348 /**
349 * Return the configured connection timeout
350 *
351 * @return timeout
352 */
353 public int getConnectionTimeoutMs()
354 {
355 return connectionTimeoutMs;
356 }
357
358 /**
359 * For internal use only - reset the internally managed ZK handle
360 *
361 * @throws Exception errors
362 */
363 public void reset() throws Exception
364 {
365 state.reset();
366 }
367
368 /**
369 * Every time a new {@link ZooKeeper} instance is allocated, the "instance index"
370 * is incremented.
371 *
372 * @return the current instance index
373 */
374 public long getInstanceIndex()
375 {
376 return state.getInstanceIndex();
377 }
378
379 /**
380 * Return the configured connection handling policy
381 *
382 * @return ConnectionHandlingPolicy
383 */
384 public ConnectionHandlingPolicy getConnectionHandlingPolicy()
385 {
386 return connectionHandlingPolicy;
387 }
388
389 /**
390 * Return the most recent value of {@link ZooKeeper#getSessionTimeout()} or 0
391 *
392 * @return session timeout or 0
393 */
394 public int getLastNegotiatedSessionTimeoutMs()
395 {
396 return state.getLastNegotiatedSessionTimeoutMs();
397 }
398
399 void addParentWatcher(Watcher watcher)
400 {
401 state.addParentWatcher(watcher);
402 }
403
404 void removeParentWatcher(Watcher watcher)
405 {
406 state.removeParentWatcher(watcher);
407 }
408
409 /**
410 * For internal use only
411 *
412 * @throws InterruptedException interruptions
413 */
414 public void internalBlockUntilConnectedOrTimedOut() throws InterruptedException
415 {
416 long waitTimeMs = connectionTimeoutMs;
417 while ( !state.isConnected() && (waitTimeMs > 0) )
418 {
419 final CountDownLatch latch = new CountDownLatch(1);
420 Watcher tempWatcher = new Watcher()
421 {
422 @Override
423 public void process(WatchedEvent event)
424 {
425 latch.countDown();
426 }
427 };
428
429 state.addParentWatcher(tempWatcher);
430 long startTimeMs = System.currentTimeMillis();
431 try
432 {
433 latch.await(1, TimeUnit.SECONDS);
434 }
435 finally
436 {
437 state.removeParentWatcher(tempWatcher);
438 }
439 long elapsed = Math.max(1, System.currentTimeMillis() - startTimeMs);
440 waitTimeMs -= elapsed;
441 }
442 }
443 }