SAMZA-1692: Standalone stability fixes.
[samza.git] / samza-core / src / test / java / org / apache / samza / zk / TestScheduleAfterDebounceTime.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package org.apache.samza.zk;
21
22 import org.junit.Assert;
23 import org.junit.Rule;
24 import org.junit.Test;
25 import org.junit.rules.Timeout;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.TimeUnit;
31
32 public class TestScheduleAfterDebounceTime {
33 private static final Logger LOG = LoggerFactory.getLogger(TestScheduleAfterDebounceTime.class);
34
35 private static final long WAIT_TIME = 500;
36
37 private static final String TEST_PROCESSOR_ID = "TEST_PROCESSOR_ID";
38
39 @Rule
40 public Timeout testTimeOutInSeconds = new Timeout(10, TimeUnit.SECONDS);
41
42 class TestObj {
43 private volatile int i = 0;
44 public void inc() {
45 i++;
46 }
47 public void setTo(int val) {
48 i = val;
49 }
50 public int get() {
51 return i;
52 }
53 }
54
55 @Test
56 public void testSchedule() throws InterruptedException {
57 ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime(TEST_PROCESSOR_ID);
58 final CountDownLatch latch = new CountDownLatch(1);
59
60 final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
61 scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME, () -> {
62 testObj.inc();
63 latch.countDown();
64 });
65 // action is delayed
66 Assert.assertEquals(0, testObj.get());
67
68 boolean result = latch.await(WAIT_TIME * 2, TimeUnit.MILLISECONDS);
69 Assert.assertTrue("Latch timed-out and task was not scheduled on time.", result);
70 Assert.assertEquals(1, testObj.get());
71
72 scheduledQueue.stopScheduler();
73 }
74
75 @Test
76 public void testCancelAndSchedule() throws InterruptedException {
77 ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime(TEST_PROCESSOR_ID);
78 final CountDownLatch test1Latch = new CountDownLatch(1);
79
80 final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
81 scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME, testObj::inc);
82 // next schedule should cancel the previous one with the same name
83 scheduledQueue.scheduleAfterDebounceTime("TEST1", 2 * WAIT_TIME, () ->
84 {
85 testObj.inc();
86 test1Latch.countDown();
87 }
88 );
89
90 final TestObj testObj2 = new TestScheduleAfterDebounceTime.TestObj();
91 // this schedule should not cancel the previous one, because it has different name
92 scheduledQueue.scheduleAfterDebounceTime("TEST2", WAIT_TIME, testObj2::inc);
93
94 boolean result = test1Latch.await(4 * WAIT_TIME, TimeUnit.MILLISECONDS);
95 Assert.assertTrue("Latch timed-out. Scheduled tasks were not run correctly.", result);
96 Assert.assertEquals(1, testObj.get());
97 Assert.assertEquals(1, testObj2.get());
98
99 scheduledQueue.stopScheduler();
100 }
101
102 @Test
103 public void testRunnableWithExceptionInvokesCallback() throws InterruptedException {
104 final CountDownLatch latch = new CountDownLatch(1);
105 final Throwable[] taskCallbackException = new Exception[1];
106 ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime(TEST_PROCESSOR_ID);
107 scheduledQueue.setScheduledTaskCallback(throwable -> {
108 taskCallbackException[0] = throwable;
109 latch.countDown();
110 });
111
112 scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME, () ->
113 {
114 throw new RuntimeException("From the runnable!");
115 });
116
117 final TestObj testObj = new TestObj();
118 scheduledQueue.scheduleAfterDebounceTime("TEST2", WAIT_TIME * 2, testObj::inc);
119
120 boolean result = latch.await(5 * WAIT_TIME, TimeUnit.MILLISECONDS);
121 Assert.assertTrue("Latch timed-out.", result);
122 Assert.assertEquals(0, testObj.get());
123 Assert.assertEquals(RuntimeException.class, taskCallbackException[0].getClass());
124 scheduledQueue.stopScheduler();
125 }
126
127 @Test
128 public void testNewTasksScheduledAfterShutdownDoesNotThrowException() throws InterruptedException {
129 ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime(TEST_PROCESSOR_ID);
130
131 scheduledQueue.stopScheduler();
132 scheduledQueue.scheduleAfterDebounceTime("TEST1", 2 * WAIT_TIME, () -> Assert.fail("New event should not be scheduled"));
133 }
134 }