7f687d791a544cec56e3d55b70da756e98374fc4
[samza.git] / samza-core / src / test / java / org / apache / samza / zk / TestScheduleAfterDebounceTime.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package org.apache.samza.zk;
21
22 import org.junit.Assert;
23 import org.junit.Rule;
24 import org.junit.Test;
25 import org.junit.rules.Timeout;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.TimeUnit;
31
32 public class TestScheduleAfterDebounceTime {
33 private static final Logger LOG = LoggerFactory.getLogger(TestScheduleAfterDebounceTime.class);
34
35 private static final long WAIT_TIME = 500;
36
37 @Rule
38 public Timeout testTimeOutInSeconds = new Timeout(10, TimeUnit.SECONDS);
39
40 class TestObj {
41 private volatile int i = 0;
42 public void inc() {
43 i++;
44 }
45 public void setTo(int val) {
46 i = val;
47 }
48 public int get() {
49 return i;
50 }
51 }
52
53 @Test
54 public void testSchedule() throws InterruptedException {
55 ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
56 final CountDownLatch latch = new CountDownLatch(1);
57
58 final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
59 scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME, () -> {
60 testObj.inc();
61 latch.countDown();
62 });
63 // action is delayed
64 Assert.assertEquals(0, testObj.get());
65
66 boolean result = latch.await(WAIT_TIME * 2, TimeUnit.MILLISECONDS);
67 Assert.assertTrue("Latch timed-out and task was not scheduled on time.", result);
68 Assert.assertEquals(1, testObj.get());
69
70 scheduledQueue.stopScheduler();
71 }
72
73 @Test
74 public void testCancelAndSchedule() throws InterruptedException {
75 ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
76 final CountDownLatch test1Latch = new CountDownLatch(1);
77
78 final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
79 scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME, testObj::inc);
80 // next schedule should cancel the previous one with the same name
81 scheduledQueue.scheduleAfterDebounceTime("TEST1", 2 * WAIT_TIME, () ->
82 {
83 testObj.inc();
84 test1Latch.countDown();
85 }
86 );
87
88 final TestObj testObj2 = new TestScheduleAfterDebounceTime.TestObj();
89 // this schedule should not cancel the previous one, because it has different name
90 scheduledQueue.scheduleAfterDebounceTime("TEST2", WAIT_TIME, testObj2::inc);
91
92 boolean result = test1Latch.await(4 * WAIT_TIME, TimeUnit.MILLISECONDS);
93 Assert.assertTrue("Latch timed-out. Scheduled tasks were not run correctly.", result);
94 Assert.assertEquals(1, testObj.get());
95 Assert.assertEquals(1, testObj2.get());
96
97 scheduledQueue.stopScheduler();
98 }
99
100 @Test
101 public void testRunnableWithExceptionInvokesCallback() throws InterruptedException {
102 final CountDownLatch latch = new CountDownLatch(1);
103 final Throwable[] taskCallbackException = new Exception[1];
104 ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime();
105 scheduledQueue.setScheduledTaskCallback(throwable -> {
106 taskCallbackException[0] = throwable;
107 latch.countDown();
108 });
109
110 scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME, () ->
111 {
112 throw new RuntimeException("From the runnable!");
113 });
114
115 final TestObj testObj = new TestObj();
116 scheduledQueue.scheduleAfterDebounceTime("TEST2", WAIT_TIME * 2, testObj::inc);
117
118 boolean result = latch.await(5 * WAIT_TIME, TimeUnit.MILLISECONDS);
119 Assert.assertTrue("Latch timed-out.", result);
120 Assert.assertEquals(0, testObj.get());
121 Assert.assertEquals(RuntimeException.class, taskCallbackException[0].getClass());
122 scheduledQueue.stopScheduler();
123 }
124 }