GIRAPH-1036: Allow mappers to fail early on exceptions
[giraph.git] / giraph-core / src / main / java / org / apache / giraph / utils / ReactiveJMapHistoDumper.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.utils;
20
21 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
22 import org.apache.giraph.conf.GiraphConstants;
23 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24 import org.apache.giraph.master.MasterObserver;
25 import org.apache.giraph.worker.WorkerObserver;
26 import org.apache.log4j.Logger;
27
28 /**
29 * An observer for both worker and master that periodically checks if available
30 * memory on heap is below certain threshold, and if found to be the case
31 * dumps jmap -histo for the process
32 */
33 public class ReactiveJMapHistoDumper extends
34 DefaultImmutableClassesGiraphConfigurable implements
35 MasterObserver, WorkerObserver {
36 /** Logger */
37 private static final Logger LOG = Logger.getLogger(
38 ReactiveJMapHistoDumper.class);
39 /** Size of mb */
40 private static final int MB = 1024 * 1024;
41
42 /** How many msec to sleep between calls */
43 private int sleepMillis;
44 /** How many lines of output to print */
45 private int linesToPrint;
46 /** How much free memory is expected */
47 private int minFreeMemory;
48
49 /** The jmap printing thread */
50 private Thread thread;
51 /** Halt jmap thread */
52 private volatile boolean stop = false;
53
54 @Override
55 public void preLoad() {
56 // This is called by both WorkerObserver and MasterObserver
57 startSupervisorThread();
58 }
59
60 @Override
61 public void postSave() {
62 // This is called by both WorkerObserver and MasterObserver
63 joinSupervisorThread();
64 }
65
66 @Override
67 public void preApplication() {
68 }
69
70 @Override
71 public void postApplication() {
72 }
73
74 /**
75 * Join the supervisor thread
76 */
77 private void joinSupervisorThread() {
78 stop = true;
79 try {
80 thread.join(sleepMillis + 5000);
81 } catch (InterruptedException e) {
82 LOG.error("Failed to join jmap thread");
83 }
84 }
85
86 /**
87 * Start the supervisor thread
88 */
89 public void startSupervisorThread() {
90 stop = false;
91 final Runtime runtime = Runtime.getRuntime();
92 thread = new Thread(new Runnable() {
93 @Override
94 public void run() {
95 try {
96 while (!stop) {
97 long potentialMemory = (runtime.maxMemory() -
98 runtime.totalMemory()) + runtime.freeMemory();
99 if (potentialMemory / MB < minFreeMemory) {
100 JMap.heapHistogramDump(linesToPrint);
101 }
102 Thread.sleep(sleepMillis);
103 }
104 } catch (InterruptedException e) {
105 LOG.warn("JMap histogram sleep interrupted", e);
106 }
107 }
108 });
109 thread.setName("ReactiveJMapHistoDumperSupervisorThread");
110 thread.setDaemon(true);
111 thread.start();
112 }
113
114 @Override
115 public void preSuperstep(long superstep) { }
116
117 @Override
118 public void postSuperstep(long superstep) { }
119
120 @Override
121 public void applicationFailed(Exception e) { }
122
123 @Override
124 public void setConf(ImmutableClassesGiraphConfiguration configuration) {
125 sleepMillis = GiraphConstants.JMAP_SLEEP_MILLIS.get(configuration);
126 linesToPrint = GiraphConstants.JMAP_PRINT_LINES.get(configuration);
127 minFreeMemory = GiraphConstants.MIN_FREE_MBS_ON_HEAP.get(configuration);
128 }
129 }