SAMZA-1692: Standalone stability fixes.
[samza.git] / samza-core / src / main / java / org / apache / samza / zk / ScheduleAfterDebounceTime.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package org.apache.samza.zk;
21
22 import com.google.common.util.concurrent.ThreadFactoryBuilder;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
28 import java.util.Map;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.Optional;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35 * This class allows scheduling a Runnable actions after some debounce time.
36 * When the same action is scheduled it needs to cancel the previous one. To accomplish that we keep the previous
37 * future in a map, keyed by the action name.
38 */
39 public class ScheduleAfterDebounceTime {
40 private static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
41 private static final String DEBOUNCE_THREAD_NAME_FORMAT = "Samza Debounce Thread-%s";
42
43 // timeout to wait for a task to complete.
44 private static final int TIMEOUT_MS = 1000 * 10;
45
46 /**
47 * {@link ScheduledTaskCallback} associated with the scheduler. OnError method of the
48 * callback will be invoked on first scheduled task failure.
49 */
50 private Optional<ScheduledTaskCallback> scheduledTaskCallback;
51
52 // Responsible for scheduling delayed actions.
53 private final ScheduledExecutorService scheduledExecutorService;
54
55 /**
56 * A map from actionName to {@link ScheduledFuture} of task scheduled for execution.
57 */
58 private final Map<String, ScheduledFuture> futureHandles = new ConcurrentHashMap<>();
59 private volatile boolean isShuttingDown;
60
61 public ScheduleAfterDebounceTime(String processorId) {
62 ThreadFactory threadFactory = new ThreadFactoryBuilder()
63 .setNameFormat(String.format(DEBOUNCE_THREAD_NAME_FORMAT, processorId))
64 .setDaemon(true).build();
65 this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
66 isShuttingDown = false;
67 }
68
69 public void setScheduledTaskCallback(ScheduledTaskCallback scheduledTaskCallback) {
70 this.scheduledTaskCallback = Optional.ofNullable(scheduledTaskCallback);
71 }
72
73 /**
74 * Performs the following operations in sequential order.
75 * <ul>
76 * <li> Makes best effort to cancel any existing task in task queue associated with the action.</li>
77 * <li> Schedules the incoming action for later execution and records its future.</li>
78 * </ul>
79 *
80 * @param actionName the name of scheduleable action.
81 * @param delayInMillis the time from now to delay execution.
82 * @param runnable the action to execute.
83 */
84 public synchronized void scheduleAfterDebounceTime(String actionName, long delayInMillis, Runnable runnable) {
85 if (!isShuttingDown) {
86 // 1. Try to cancel any existing scheduled task associated with the action.
87 tryCancelScheduledAction(actionName);
88
89 // 2. Schedule the action.
90 ScheduledFuture scheduledFuture =
91 scheduledExecutorService.schedule(getScheduleableAction(actionName, runnable), delayInMillis, TimeUnit.MILLISECONDS);
92
93 LOG.info("Scheduled action: {} to run after: {} milliseconds.", actionName, delayInMillis);
94 futureHandles.put(actionName, scheduledFuture);
95 } else {
96 LOG.info("Scheduler is stopped. Not scheduling action: {} to run.", actionName);
97 }
98 }
99
100
101 public synchronized void cancelAction(String action) {
102 if (!isShuttingDown) {
103 this.tryCancelScheduledAction(action);
104 }
105 }
106
107
108 /**
109 * Stops the scheduler. After this invocation no further schedule calls will be accepted
110 * and all pending enqueued tasks will be cancelled.
111 */
112 public synchronized void stopScheduler() {
113 if (isShuttingDown) {
114 LOG.debug("Debounce timer shutdown is already in progress!");
115 return;
116 }
117
118 isShuttingDown = true;
119 LOG.info("Shutting down debounce timer!");
120
121 // changing it back to use shutdown instead to prevent interruptions on the active task
122 scheduledExecutorService.shutdown();
123
124 // should clear out the future handles as well
125 futureHandles.keySet()
126 .forEach(this::tryCancelScheduledAction);
127 }
128
129 /**
130 * Tries to cancel the task that belongs to {@code actionName} submitted to the queue.
131 *
132 * @param actionName the name of action to cancel.
133 */
134 private void tryCancelScheduledAction(String actionName) {
135 LOG.info("Trying to cancel the action: {}.", actionName);
136 ScheduledFuture scheduledFuture = futureHandles.get(actionName);
137 if (scheduledFuture != null && !scheduledFuture.isDone()) {
138 LOG.info("Attempting to cancel the future of action: {}", actionName);
139 // Attempt to cancel
140 if (!scheduledFuture.cancel(false)) {
141 try {
142 scheduledFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
143 } catch (Exception e) {
144 // we ignore the exception
145 LOG.warn("Cancelling the future of action: {} failed.", actionName, e);
146 }
147 }
148 futureHandles.remove(actionName);
149 }
150 }
151
152 /**
153 * Decorate the executable action with exception handlers to facilitate cleanup on failures.
154 *
155 * @param actionName the name of the scheduleable action.
156 * @param runnable the action to execute.
157 * @return the executable action decorated with exception handlers.
158 */
159 private Runnable getScheduleableAction(String actionName, Runnable runnable) {
160 return () -> {
161 try {
162 if (!isShuttingDown) {
163 runnable.run();
164 /*
165 * Expects all run() implementations <b>not to swallow the interrupts.</b>
166 * This thread is interrupted from an external source(mostly executor service) to die.
167 */
168 if (Thread.currentThread().isInterrupted()) {
169 LOG.warn("Action: {} is interrupted.", actionName);
170 doCleanUpOnTaskException(new InterruptedException());
171 } else {
172 LOG.info("Action: {} completed successfully.", actionName);
173 }
174 }
175 } catch (Throwable throwable) {
176 LOG.error("Execution of action: {} failed.", actionName, throwable);
177 doCleanUpOnTaskException(throwable);
178 }
179 };
180 }
181
182 /**
183 * Handler method to invoke on a exception during an scheduled task execution and which
184 * the following operations in sequential order.
185 * <ul>
186 * <li> Stop the scheduler. If the task execution fails or a task is interrupted, scheduler will not accept/execute any new tasks.</li>
187 * <li> Invokes the onError handler method if taskCallback is defined.</li>
188 * </ul>
189 *
190 * @param throwable the exception happened during task execution.
191 */
192 private void doCleanUpOnTaskException(Throwable throwable) {
193 stopScheduler();
194
195 scheduledTaskCallback.ifPresent(callback -> callback.onError(throwable));
196 }
197
198 /**
199 * A ScheduledTaskCallback::onError() is invoked on first occurrence of exception
200 * when executing a task. Provides plausible hook for handling failures
201 * in an asynchronous scheduled task execution.
202 */
203 interface ScheduledTaskCallback {
204 void onError(Throwable throwable);
205 }
206 }