GIRAPH-1036: Allow mappers to fail early on exceptions
[giraph.git] / giraph-core / src / main / java / org / apache / giraph / ooc / AdaptiveOutOfCoreEngine.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.ooc;
20
21 import com.google.common.collect.Lists;
22 import org.apache.giraph.bsp.CentralizedServiceWorker;
23 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24 import org.apache.giraph.utils.CallableFactory;
25 import org.apache.giraph.utils.LogStacktraceCallable;
26 import org.apache.giraph.utils.ThreadUtils;
27 import org.apache.hadoop.io.Writable;
28 import org.apache.hadoop.io.WritableComparable;
29 import org.apache.log4j.Logger;
30
31 import java.util.List;
32 import java.util.concurrent.ArrayBlockingQueue;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.BrokenBarrierException;
35 import java.util.concurrent.Callable;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.CyclicBarrier;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.atomic.AtomicInteger;
42
43 /**
44 * Adaptive out-of-core mechanism. This mechanism spawns two types of threads:
45 * 1) check-memory thread, which periodically monitors the amount of available
46 * memory and decides whether data should go on disk. This threads is
47 * basically the brain behind the out-of-core mechanism, commands
48 * "out-of-core processor threads" (type 2 thread below) to move
49 * appropriate data to disk,
50 * 2) out-of-core processor threads. This is a team of threads responsible for
51 * offloading appropriate data to disk. "check-memory thread" decides on
52 * which data should go to disk, and "out-of-core processor threads" do the
53 * offloading.
54 *
55 * @param <I> Vertex id
56 * @param <V> Vertex value
57 * @param <E> Edge data
58 */
59 public class AdaptiveOutOfCoreEngine<I extends WritableComparable,
60 V extends Writable, E extends Writable> implements
61 OutOfCoreEngine<I, V, E> {
62 /** Class logger. */
63 private static final Logger LOG =
64 Logger.getLogger(AdaptiveOutOfCoreEngine.class);
65
66 // ---- Synchronization Variables ----
67 /** Barrier to coordinate check-memory and OOC-processing threads */
68 private final CyclicBarrier gate;
69 /**
70 * Signal to determine whether OOC processing threads are done processing OOC
71 * requests
72 */
73 private final CyclicBarrier doneOocSignal;
74 /** Signal to determine whether the computation is terminated */
75 private final CountDownLatch doneCompute;
76 /** Finisher signal to OOC processing threads */
77 private volatile boolean done;
78
79 // ---- OOC Commands ----
80 /**
81 * List of partitions that are on disk, and their loaded *vertices* during
82 * INPUT_SUPERSTEP are ready to be flushed to disk
83 */
84 private final BlockingQueue<Integer> partitionsWithInputVertices;
85 /**
86 * List of partitions that are on disk, and their loaded *edges* during
87 * INPUT_SUPERSTEP are ready to be flushed to disk
88 */
89 private final BlockingQueue<Integer> partitionsWithInputEdges;
90 /**
91 * List of partitions that are on disk, and their message buffers (either
92 * messages for current superstep, or incoming messages for next superstep)
93 * are ready to be flushed to disk
94 */
95 private final BlockingQueue<Integer> partitionsWithPendingMessages;
96 /** Number of partitions to be written to disk */
97 private final AtomicInteger numPartitionsToSpill;
98
99 /** Executor service for check memory thread */
100 private ExecutorService checkMemoryExecutor;
101 /** Executor service for out-of-core processor threads */
102 private ExecutorService outOfCoreProcessorExecutor;
103
104 /** Configuration */
105 private ImmutableClassesGiraphConfiguration<I, V, E> conf;
106 /** Worker */
107 private final CentralizedServiceWorker<I, V, E> serviceWorker;
108
109 /** Cached value for number of out-of-core threads specified by user */
110 private int numOocThreads;
111
112 /** Result of check-memory thread (to be checked for graceful termination) */
113 private Future<Void> checkMemoryResult;
114 /**
115 * Results of out-of-core processor threads (to be checked for graceful
116 * termination)
117 */
118 private List<Future<Void>> oocProcessorResults;
119
120 /**
121 * Creates an instance of adaptive mechanism
122 * @param conf Configuration
123 * @param serviceWorker Worker service
124 */
125 public AdaptiveOutOfCoreEngine(ImmutableClassesGiraphConfiguration conf,
126 CentralizedServiceWorker<I, V, E> serviceWorker) {
127 this.conf = conf;
128 this.serviceWorker = serviceWorker;
129
130 this.numOocThreads = conf.getNumOocThreads();
131 this.gate = new CyclicBarrier(numOocThreads + 1);
132 this.doneOocSignal = new CyclicBarrier(numOocThreads + 1);
133 this.doneCompute = new CountDownLatch(1);
134 this.done = false;
135 this.partitionsWithInputVertices = new ArrayBlockingQueue<Integer>(100);
136 this.partitionsWithInputEdges = new ArrayBlockingQueue<Integer>(100);
137 this.partitionsWithPendingMessages = new ArrayBlockingQueue<Integer>(100);
138 this.numPartitionsToSpill = new AtomicInteger(0);
139 }
140
141 @Override
142 public void initialize() {
143 if (LOG.isInfoEnabled()) {
144 LOG.info("initialize: initializing out-of-core engine");
145 }
146 CallableFactory<Void> checkMemoryCallableFactory =
147 new CallableFactory<Void>() {
148 @Override
149 public Callable<Void> newCallable(int callableId) {
150 return new CheckMemoryCallable<I, V, E>(
151 AdaptiveOutOfCoreEngine.this, conf, serviceWorker);
152 }
153 };
154 checkMemoryExecutor = Executors.newSingleThreadExecutor(
155 ThreadUtils.createThreadFactory("check-memory"));
156 checkMemoryResult = checkMemoryExecutor.submit(new LogStacktraceCallable<>(
157 checkMemoryCallableFactory.newCallable(0)));
158
159 CallableFactory<Void> outOfCoreProcessorCallableFactory =
160 new CallableFactory<Void>() {
161 @Override
162 public Callable<Void> newCallable(int callableId) {
163 return new OutOfCoreProcessorCallable<I, V, E>(
164 AdaptiveOutOfCoreEngine.this, serviceWorker);
165 }
166 };
167 outOfCoreProcessorExecutor =
168 Executors.newFixedThreadPool(numOocThreads,
169 ThreadUtils.createThreadFactory("ooc-%d"));
170 oocProcessorResults = Lists.newArrayListWithCapacity(numOocThreads);
171 for (int i = 0; i < numOocThreads; ++i) {
172 Future<Void> future = outOfCoreProcessorExecutor.submit(
173 new LogStacktraceCallable<>(
174 outOfCoreProcessorCallableFactory.newCallable(i)));
175 oocProcessorResults.add(future);
176 }
177 }
178
179 @Override
180 public void shutdown() {
181 doneCompute.countDown();
182 checkMemoryExecutor.shutdown();
183 if (checkMemoryResult.isCancelled()) {
184 throw new IllegalStateException(
185 "shutdown: memory check thread did not " + "terminate gracefully!");
186 }
187 outOfCoreProcessorExecutor.shutdown();
188 for (int i = 0; i < numOocThreads; ++i) {
189 if (oocProcessorResults.get(i).isCancelled()) {
190 throw new IllegalStateException("shutdown: out-of-core processor " +
191 "thread " + i + " did not terminate gracefully.");
192 }
193 }
194 }
195
196 /**
197 * @return the latch that signals whether the whole computation is done
198 */
199 public CountDownLatch getDoneCompute() {
200 return doneCompute;
201 }
202
203 /**
204 * @return whether the computation is done
205 */
206 public boolean isDone() {
207 return done;
208 }
209
210 /**
211 * @return list of partitions that have large enough buffers of vertices read
212 * in INPUT_SUPERSTEP.
213 */
214 public BlockingQueue<Integer> getPartitionsWithInputVertices() {
215 return partitionsWithInputVertices;
216 }
217
218 /**
219 * @return list of partitions that have large enough buffers of edges read
220 * in INPUT_SUPERSTEP.
221 */
222 public BlockingQueue<Integer> getPartitionsWithInputEdges() {
223 return partitionsWithInputEdges;
224 }
225
226 /**
227 * @return list of partitions that have large enough message buffers.
228 */
229 public BlockingQueue<Integer> getPartitionsWithPendingMessages() {
230 return partitionsWithPendingMessages;
231 }
232
233 /**
234 * @return number of partitions to spill to disk
235 */
236 public AtomicInteger getNumPartitionsToSpill() {
237 return numPartitionsToSpill;
238 }
239
240 /**
241 * Wait on gate with which OOC processor threads are notified to execute
242 * commands provided by brain (memory-check thread).
243 *
244 * @throws BrokenBarrierException
245 * @throws InterruptedException
246 */
247 public void waitOnGate() throws BrokenBarrierException, InterruptedException {
248 gate.await();
249 }
250
251 /**
252 * Reset the gate for reuse.
253 */
254 public void resetGate() {
255 gate.reset();
256 }
257
258 /**
259 * Wait on signal from all OOC processor threads that the offloading of data
260 * is complete.
261 *
262 * @throws BrokenBarrierException
263 * @throws InterruptedException
264 */
265 public void waitOnOocSignal()
266 throws BrokenBarrierException, InterruptedException {
267 doneOocSignal.await();
268 }
269
270 /**
271 * Reset the completion signal of OOC processor threads for reuse.
272 */
273 public void resetOocSignal() {
274 doneOocSignal.reset();
275 }
276
277 /**
278 * Set the computation as done (i.e. setting the state that determines the
279 * whole computation is done).
280 */
281 public void setDone() {
282 done = true;
283 }
284 }