08a7914d79fcc098a7c8707847369eab98bfb445
[giraph.git] / giraph-core / src / main / java / org / apache / giraph / utils / ProgressableUtils.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.utils;
20
21 import org.apache.hadoop.util.Progressable;
22 import org.apache.log4j.Logger;
23 import io.netty.channel.ChannelFuture;
24 import io.netty.channel.group.ChannelGroupFuture;
25 import io.netty.util.concurrent.EventExecutorGroup;
26
27 import com.google.common.collect.Lists;
28 import com.google.common.util.concurrent.ThreadFactoryBuilder;
29
30 import java.util.List;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.TimeoutException;
38
39 /** Functions for waiting on some events to happen while reporting progress */
40 public class ProgressableUtils {
41 /** Class logger */
42 private static final Logger LOG =
43 Logger.getLogger(ProgressableUtils.class);
44 /** Msecs to refresh the progress meter (one minute) */
45 private static final int DEFUALT_MSEC_PERIOD = 60 * 1000;
46
47 /** Do not instantiate. */
48 private ProgressableUtils() {
49 }
50
51 /**
52 * Wait for executor tasks to terminate, while periodically reporting
53 * progress.
54 *
55 * @param executor Executor which we are waiting for
56 * @param progressable Progressable for reporting progress (Job context)
57 * @param msecsPeriod How often to report progress
58 */
59 public static void awaitExecutorTermination(ExecutorService executor,
60 Progressable progressable, int msecsPeriod) {
61 waitForever(new ExecutorServiceWaitable(executor), progressable,
62 msecsPeriod);
63 }
64
65 /**
66 * Wait for executor tasks to terminate, while periodically reporting
67 * progress.
68 *
69 * @param executor Executor which we are waiting for
70 * @param progressable Progressable for reporting progress (Job context)
71 */
72 public static void awaitExecutorTermination(ExecutorService executor,
73 Progressable progressable) {
74 waitForever(new ExecutorServiceWaitable(executor), progressable);
75 }
76
77 /**
78 * Wait for executorgroup to terminate, while periodically reporting progress
79 *
80 * @param group ExecutorGroup whose termination we are awaiting
81 * @param progressable Progressable for reporting progress (Job context)
82 */
83 public static void awaitTerminationFuture(EventExecutorGroup group,
84 Progressable progressable) {
85 waitForever(new FutureWaitable<>(group.terminationFuture()), progressable);
86 }
87
88 /**
89 * Wait for the result of the future to be ready, while periodically
90 * reporting progress.
91 *
92 * @param <T> Type of the return value of the future
93 * @param future Future
94 * @param progressable Progressable for reporting progress (Job context)
95 * @return Computed result of the future.
96 */
97 public static <T> T getFutureResult(Future<T> future,
98 Progressable progressable) {
99 return waitForever(new FutureWaitable<T>(future), progressable);
100 }
101
102 /**
103 * Wait for {@link ChannelGroupFuture} to finish, while periodically
104 * reporting progress.
105 *
106 * @param future ChannelGroupFuture
107 * @param progressable Progressable for reporting progress (Job context)
108 */
109 public static void awaitChannelGroupFuture(ChannelGroupFuture future,
110 Progressable progressable) {
111 waitForever(new ChannelGroupFutureWaitable(future), progressable);
112 }
113
114 /**
115 * Wait for {@link ChannelFuture} to finish, while periodically
116 * reporting progress.
117 *
118 * @param future ChannelFuture
119 * @param progressable Progressable for reporting progress (Job context)
120 */
121 public static void awaitChannelFuture(ChannelFuture future,
122 Progressable progressable) {
123 waitForever(new ChannelFutureWaitable(future), progressable);
124 }
125
126 /**
127 * Wait forever for waitable to finish. Periodically reports progress.
128 *
129 * @param waitable Waitable which we wait for
130 * @param progressable Progressable for reporting progress (Job context)
131 * @param <T> Result type
132 * @return Result of waitable
133 */
134 private static <T> T waitForever(Waitable<T> waitable,
135 Progressable progressable) {
136 return waitForever(waitable, progressable, DEFUALT_MSEC_PERIOD);
137 }
138
139 /**
140 * Wait forever for waitable to finish. Periodically reports progress.
141 *
142 * @param waitable Waitable which we wait for
143 * @param progressable Progressable for reporting progress (Job context)
144 * @param msecsPeriod How often to report progress
145 * @param <T> Result type
146 * @return Result of waitable
147 */
148 private static <T> T waitForever(Waitable<T> waitable,
149 Progressable progressable, int msecsPeriod) {
150 while (true) {
151 waitFor(waitable, progressable, msecsPeriod, msecsPeriod);
152 if (waitable.isFinished()) {
153 try {
154 return waitable.getResult();
155 } catch (ExecutionException e) {
156 throw new IllegalStateException("waitForever: " +
157 "ExecutionException occurred while waiting for " + waitable, e);
158 } catch (InterruptedException e) {
159 throw new IllegalStateException("waitForever: " +
160 "InterruptedException occurred while waiting for " + waitable, e);
161 }
162 }
163 }
164 }
165
166 /**
167 * Wait for desired number of milliseconds for waitable to finish.
168 * Periodically reports progress.
169 *
170 * @param waitable Waitable which we wait for
171 * @param progressable Progressable for reporting progress (Job context)
172 * @param msecs Number of milliseconds to wait for
173 * @param msecsPeriod How often to report progress
174 * @param <T> Result type
175 * @return Result of waitable
176 */
177 private static <T> T waitFor(Waitable<T> waitable, Progressable progressable,
178 int msecs, int msecsPeriod) {
179 long timeoutTimeMsecs = System.currentTimeMillis() + msecs;
180 int currentWaitMsecs;
181 while (true) {
182 progressable.progress();
183 currentWaitMsecs = Math.min(msecs, msecsPeriod);
184 try {
185 waitable.waitFor(currentWaitMsecs);
186 if (waitable.isFinished()) {
187 return waitable.getResult();
188 }
189 } catch (InterruptedException e) {
190 throw new IllegalStateException("waitFor: " +
191 "InterruptedException occurred while waiting for " + waitable, e);
192 } catch (ExecutionException e) {
193 throw new IllegalStateException("waitFor: " +
194 "ExecutionException occurred while waiting for " + waitable, e);
195 }
196 if (LOG.isInfoEnabled()) {
197 LOG.info("waitFor: Waiting for " + waitable);
198 }
199 if (System.currentTimeMillis() >= timeoutTimeMsecs) {
200 return waitable.getTimeoutResult();
201 }
202 msecs = Math.max(0, msecs - currentWaitMsecs);
203 }
204 }
205
206 /**
207 * Create {#link numThreads} callables from {#link callableFactory},
208 * execute them and gather results.
209 *
210 * @param callableFactory Factory for Callables
211 * @param numThreads Number of threads to use
212 * @param threadNameFormat Format for thread name
213 * @param progressable Progressable for reporting progress
214 * @param <R> Type of Callable's results
215 * @return List of results from Callables
216 */
217 public static <R> List<R> getResultsWithNCallables(
218 CallableFactory<R> callableFactory, int numThreads,
219 String threadNameFormat, Progressable progressable) {
220 ExecutorService executorService =
221 Executors.newFixedThreadPool(numThreads,
222 new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build());
223 List<Future<R>> futures = Lists.newArrayListWithCapacity(numThreads);
224 for (int i = 0; i < numThreads; i++) {
225 Callable<R> callable = callableFactory.newCallable(i);
226 Future<R> future = executorService.submit(
227 new LogStacktraceCallable<R>(callable));
228 futures.add(future);
229 }
230 executorService.shutdown();
231 List<R> futureResults = Lists.newArrayListWithCapacity(numThreads);
232 for (Future<R> future : futures) {
233 R result = ProgressableUtils.getFutureResult(future, progressable);
234 futureResults.add(result);
235 }
236 return futureResults;
237 }
238
239 /**
240 * Interface for waiting on a result from some operation.
241 *
242 * @param <T> Result type.
243 */
244 private interface Waitable<T> {
245 /**
246 * Wait for desired number of milliseconds for waitable to finish.
247 *
248 * @param msecs Number of milliseconds to wait.
249 */
250 void waitFor(int msecs) throws InterruptedException, ExecutionException;
251
252 /**
253 * Check if waitable is finished.
254 *
255 * @return True iff waitable finished.
256 */
257 boolean isFinished();
258
259 /**
260 * Get result of waitable. Call after isFinished() returns true.
261 *
262 * @return Result of waitable.
263 */
264 T getResult() throws ExecutionException, InterruptedException;
265
266 /**
267 * Get the result which we want to return in case of timeout.
268 *
269 * @return Timeout result.
270 */
271 T getTimeoutResult();
272 }
273
274 /**
275 * abstract class for waitables which don't have the result.
276 */
277 private abstract static class WaitableWithoutResult
278 implements Waitable<Void> {
279 @Override
280 public Void getResult() throws ExecutionException, InterruptedException {
281 return null;
282 }
283
284 @Override
285 public Void getTimeoutResult() {
286 return null;
287 }
288 }
289
290 /**
291 * {@link Waitable} for waiting on a result of a {@link Future}.
292 *
293 * @param <T> Future result type
294 */
295 private static class FutureWaitable<T> implements Waitable<T> {
296 /** Future which we want to wait for */
297 private final Future<T> future;
298
299 /**
300 * Constructor
301 *
302 * @param future Future which we want to wait for
303 */
304 public FutureWaitable(Future<T> future) {
305 this.future = future;
306 }
307
308 @Override
309 public void waitFor(int msecs) throws InterruptedException,
310 ExecutionException {
311 try {
312 future.get(msecs, TimeUnit.MILLISECONDS);
313 } catch (TimeoutException e) {
314 if (LOG.isInfoEnabled()) {
315 LOG.info("waitFor: Future result not ready yet " + future);
316 }
317 }
318 }
319
320 @Override
321 public boolean isFinished() {
322 return future.isDone();
323 }
324
325 @Override
326 public T getResult() throws ExecutionException, InterruptedException {
327 return future.get();
328 }
329
330 @Override
331 public T getTimeoutResult() {
332 return null;
333 }
334 }
335
336 /**
337 * {@link Waitable} for waiting on an {@link ExecutorService} to terminate.
338 */
339 private static class ExecutorServiceWaitable extends WaitableWithoutResult {
340 /** ExecutorService which we want to wait for */
341 private final ExecutorService executorService;
342
343 /**
344 * Constructor
345 *
346 * @param executorService ExecutorService which we want to wait for
347 */
348 public ExecutorServiceWaitable(ExecutorService executorService) {
349 this.executorService = executorService;
350 }
351
352 @Override
353 public void waitFor(int msecs) throws InterruptedException {
354 executorService.awaitTermination(msecs, TimeUnit.MILLISECONDS);
355 }
356
357 @Override
358 public boolean isFinished() {
359 return executorService.isTerminated();
360 }
361 }
362
363 /**
364 * {@link Waitable} for waiting on a {@link ChannelGroupFuture} to
365 * terminate.
366 */
367 private static class ChannelGroupFutureWaitable extends
368 WaitableWithoutResult {
369 /** ChannelGroupFuture which we want to wait for */
370 private final ChannelGroupFuture future;
371
372 /**
373 * Constructor
374 *
375 * @param future ChannelGroupFuture which we want to wait for
376 */
377 public ChannelGroupFutureWaitable(ChannelGroupFuture future) {
378 this.future = future;
379 }
380
381 @Override
382 public void waitFor(int msecs) throws InterruptedException {
383 future.await(msecs, TimeUnit.MILLISECONDS);
384 }
385
386 @Override
387 public boolean isFinished() {
388 return future.isDone();
389 }
390 }
391
392 /**
393 * {@link Waitable} for waiting on a {@link ChannelFuture} to
394 * terminate.
395 */
396 private static class ChannelFutureWaitable extends WaitableWithoutResult {
397 /** ChannelGroupFuture which we want to wait for */
398 private final ChannelFuture future;
399
400 /**
401 * Constructor
402 *
403 * @param future ChannelFuture which we want to wait for
404 */
405 public ChannelFutureWaitable(ChannelFuture future) {
406 this.future = future;
407 }
408
409 @Override
410 public void waitFor(int msecs) throws InterruptedException {
411 future.await(msecs, TimeUnit.MILLISECONDS);
412 }
413
414 @Override
415 public boolean isFinished() {
416 return future.isDone();
417 }
418 }
419 }