GIRAPH-1036: Allow mappers to fail early on exceptions
[giraph.git] / giraph-core / src / main / java / org / apache / giraph / comm / messages / queue / AsyncMessageStoreWrapper.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.giraph.comm.messages.queue;
19
20 import it.unimi.dsi.fastutil.ints.Int2IntArrayMap;
21 import it.unimi.dsi.fastutil.ints.Int2IntMap;
22 import org.apache.giraph.comm.messages.MessageStore;
23 import org.apache.giraph.utils.ThreadUtils;
24 import org.apache.giraph.utils.VertexIdMessages;
25 import org.apache.hadoop.io.Writable;
26 import org.apache.hadoop.io.WritableComparable;
27 import org.apache.log4j.Logger;
28
29 import java.io.DataInput;
30 import java.io.DataOutput;
31 import java.io.IOException;
32 import java.util.concurrent.BlockingQueue;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.LinkedBlockingQueue;
36 import java.util.concurrent.Semaphore;
37
38 /**
39 * This class decouples message receiving and processing
40 * into separate threads thus reducing contention.
41 * It does not provide message store functionality itself, rather
42 * providing a wrapper around existing message stores that
43 * can now be used in async mode with only slight modifications.
44 * @param <I> Vertex id
45 * @param <M> Message data
46 */
47 public final class AsyncMessageStoreWrapper<I extends WritableComparable,
48 M extends Writable> implements MessageStore<I, M> {
49
50 /** Logger */
51 private static final Logger LOG =
52 Logger.getLogger(AsyncMessageStoreWrapper.class);
53 /** Pass this id to clear the queues and shutdown all threads
54 * started by this processor */
55 private static final PartitionMessage SHUTDOWN_QUEUE_MESSAGE =
56 new PartitionMessage(-1, null);
57 /** Pass this message to clear the queues but keep threads alive */
58 private static final PartitionMessage CLEAR_QUEUE_MESSAGE =
59 new PartitionMessage(-1, null);
60 /** Executor that processes messages in background */
61 private static final ExecutorService EXECUTOR_SERVICE =
62 Executors.newCachedThreadPool(
63 ThreadUtils.createThreadFactory("AsyncMessageStoreWrapper-%d"));
64
65 /** Number of threads that will process messages in background */
66 private final int threadsCount;
67 /** Queue that temporary stores messages */
68 private final BlockingQueue<PartitionMessage<I, M>>[] queues;
69 /** Map from partition id to thread that process this partition */
70 private final Int2IntMap partition2Queue;
71 /** Signals that all procesing is done */
72 private Semaphore completionSemaphore;
73 /** Underlying message store */
74 private final MessageStore<I, M> store;
75
76 /**
77 * Constructs async wrapper around existing message store
78 * object. Requires partition list and number of threads
79 * to properly initialize background threads and assign partitions.
80 * Partitions are assigned to threads in round-robin fashion.
81 * It guarantees that all threads have almost the same number of
82 * partitions (+-1) no matter how partitions are assigned to this worker.
83 * @param store underlying message store to be used in computation
84 * @param partitions partitions assigned to this worker
85 * @param threadCount number of threads that will be used to process
86 * messages.
87 */
88 public AsyncMessageStoreWrapper(MessageStore<I, M> store,
89 Iterable<Integer> partitions,
90 int threadCount) {
91 this.store = store;
92 this.threadsCount = threadCount;
93 completionSemaphore = new Semaphore(1 - threadsCount);
94 queues = new BlockingQueue[threadsCount];
95 partition2Queue = new Int2IntArrayMap();
96 LOG.info("AsyncMessageStoreWrapper enabled. Threads= " + threadsCount);
97
98 for (int i = 0; i < threadsCount; i++) {
99 queues[i] = new LinkedBlockingQueue<>();
100 EXECUTOR_SERVICE.submit(new MessageStoreQueueWorker(queues[i]));
101 }
102
103 int cnt = 0;
104 for (int partitionId : partitions) {
105 partition2Queue.put(partitionId, cnt++ % threadsCount);
106 }
107
108 }
109
110 @Override
111 public boolean isPointerListEncoding() {
112 return store.isPointerListEncoding();
113 }
114
115 @Override
116 public Iterable<M> getVertexMessages(I vertexId) throws IOException {
117 return store.getVertexMessages(vertexId);
118 }
119
120 @Override
121 public void clearVertexMessages(I vertexId) throws IOException {
122 store.clearVertexMessages(vertexId);
123 }
124
125 @Override
126 public void clearAll() throws IOException {
127 try {
128 for (BlockingQueue<PartitionMessage<I, M>> queue : queues) {
129 queue.put(SHUTDOWN_QUEUE_MESSAGE);
130 }
131 completionSemaphore.acquire();
132 } catch (InterruptedException e) {
133 throw new RuntimeException(e);
134 }
135 store.clearAll();
136 }
137
138 @Override
139 public boolean hasMessagesForVertex(I vertexId) {
140 return store.hasMessagesForVertex(vertexId);
141 }
142
143 @Override
144 public boolean hasMessagesForPartition(int partitionId) {
145 return store.hasMessagesForPartition(partitionId);
146 }
147
148 @Override
149 public void addPartitionMessages(
150 int partitionId, VertexIdMessages<I, M> messages) throws IOException {
151 int hash = partition2Queue.get(partitionId);
152 try {
153 queues[hash].put(new PartitionMessage<>(partitionId, messages));
154 } catch (InterruptedException e) {
155 throw new RuntimeException(e);
156 }
157 }
158
159 @Override
160 public void finalizeStore() {
161 store.finalizeStore();
162 }
163
164 @Override
165 public Iterable<I> getPartitionDestinationVertices(int partitionId) {
166 return store.getPartitionDestinationVertices(partitionId);
167 }
168
169 @Override
170 public void clearPartition(int partitionId) throws IOException {
171 store.clearPartition(partitionId);
172 }
173
174 @Override
175 public void writePartition(DataOutput out, int partitionId)
176 throws IOException {
177 store.writePartition(out, partitionId);
178 }
179
180 @Override
181 public void readFieldsForPartition(DataInput in, int partitionId)
182 throws IOException {
183 store.readFieldsForPartition(in, partitionId);
184 }
185
186 /**
187 * Wait till all messages are processed and all queues are empty.
188 */
189 public void waitToComplete() {
190 try {
191 for (BlockingQueue<PartitionMessage<I, M>> queue : queues) {
192 queue.put(CLEAR_QUEUE_MESSAGE);
193 }
194 completionSemaphore.acquire();
195 completionSemaphore = new Semaphore(1 - threadsCount);
196 } catch (InterruptedException e) {
197 throw new RuntimeException(e);
198 }
199 }
200
201 /**
202 * This runnable has logic for background thread
203 * that actually does message processing.
204 */
205 private class MessageStoreQueueWorker implements Runnable {
206 /**
207 * Queue assigned to this background thread.
208 */
209 private final BlockingQueue<PartitionMessage<I, M>> queue;
210
211 /**
212 * Constructs runnable.
213 * @param queue where messages are put by client
214 */
215 private MessageStoreQueueWorker(
216 BlockingQueue<PartitionMessage<I, M>> queue) {
217 this.queue = queue;
218 }
219
220 @Override
221 public void run() {
222 PartitionMessage<I, M> message = null;
223 while (true) {
224 try {
225 message = queue.take();
226 if (message.getMessage() != null) {
227 int partitionId = message.getPartitionId();
228 store.addPartitionMessages(partitionId, message.getMessage());
229 } else {
230 completionSemaphore.release();
231 if (message == SHUTDOWN_QUEUE_MESSAGE) {
232 return;
233 }
234 }
235 } catch (IOException | InterruptedException e) {
236 LOG.error("MessageStoreQueueWorker.run: " + message, e);
237 return;
238 }
239 }
240 }
241 }
242 }