a33bf032af834e422741e4a203907c00940ce447
[samza.git] / samza-core / src / test / java / org / apache / samza / zk / TestZkUtils.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.samza.zk;
20
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.function.BooleanSupplier;
25 import org.I0Itec.zkclient.IZkDataListener;
26 import org.I0Itec.zkclient.ZkClient;
27 import org.I0Itec.zkclient.ZkConnection;
28 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
29 import org.apache.samza.SamzaException;
30 import org.apache.samza.config.MapConfig;
31 import org.apache.samza.job.model.ContainerModel;
32 import org.apache.samza.job.model.JobModel;
33 import org.apache.samza.testUtils.EmbeddedZookeeper;
34 import org.apache.samza.util.NoOpMetricsRegistry;
35 import org.junit.After;
36 import org.junit.AfterClass;
37 import org.junit.Assert;
38 import org.junit.Before;
39 import org.junit.BeforeClass;
40 import org.junit.Test;
41
42 public class TestZkUtils {
43 private static EmbeddedZookeeper zkServer = null;
44 private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
45 private ZkClient zkClient = null;
46 private static final int SESSION_TIMEOUT_MS = 20000;
47 private static final int CONNECTION_TIMEOUT_MS = 10000;
48 private ZkUtils zkUtils;
49
50 @BeforeClass
51 public static void setup() throws InterruptedException {
52 zkServer = new EmbeddedZookeeper();
53 zkServer.setup();
54 }
55
56 @Before
57 public void testSetup() {
58 try {
59 zkClient = new ZkClient(
60 new ZkConnection("127.0.0.1:" + zkServer.getPort(), SESSION_TIMEOUT_MS),
61 CONNECTION_TIMEOUT_MS);
62 } catch (Exception e) {
63 Assert.fail("Client connection setup failed. Aborting tests..");
64 }
65 try {
66 zkClient.createPersistent(KEY_BUILDER.getProcessorsPath(), true);
67 } catch (ZkNodeExistsException e) {
68 // Do nothing
69 }
70
71 zkUtils = new ZkUtils(
72 KEY_BUILDER,
73 zkClient,
74 SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
75
76 zkUtils.connect();
77 }
78
79 @After
80 public void testTeardown() {
81 zkUtils.close();
82 zkClient.close();
83 }
84
85 @AfterClass
86 public static void teardown() {
87 zkServer.teardown();
88 }
89
90 @Test
91 public void testRegisterProcessorId() {
92 String assignedPath = zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1"));
93 Assert.assertTrue(assignedPath.startsWith(KEY_BUILDER.getProcessorsPath()));
94
95 // Calling registerProcessorId again should return the same ephemeralPath as long as the session is valid
96 Assert.assertTrue(zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1")).equals(assignedPath));
97
98 }
99
100 @Test
101 public void testGetActiveProcessors() {
102 Assert.assertEquals(0, zkUtils.getSortedActiveProcessorsZnodes().size());
103 zkUtils.registerProcessorAndGetId(new ProcessorData("processorData", "1"));
104 Assert.assertEquals(1, zkUtils.getSortedActiveProcessorsZnodes().size());
105 }
106
107 @Test
108 public void testGetProcessorsIDs() {
109 Assert.assertEquals(0, zkUtils.getSortedActiveProcessorsIDs().size());
110 zkUtils.registerProcessorAndGetId(new ProcessorData("host1", "1"));
111 List<String> l = zkUtils.getSortedActiveProcessorsIDs();
112 Assert.assertEquals(1, l.size());
113 new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()).registerProcessorAndGetId(new ProcessorData("host2", "2"));
114 l = zkUtils.getSortedActiveProcessorsIDs();
115 Assert.assertEquals(2, l.size());
116
117 Assert.assertEquals(" ID1 didn't match", "1", l.get(0));
118 Assert.assertEquals(" ID2 didn't match", "2", l.get(1));
119 }
120
121 @Test
122 public void testSubscribeToJobModelVersionChange() {
123
124 ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
125 String root = keyBuilder.getRootPath();
126 zkClient.deleteRecursive(root);
127
128 class Result {
129 String res = "";
130 public String getRes() {
131 return res;
132 }
133 public void updateRes(String newRes) {
134 res = newRes;
135 }
136 }
137
138 Assert.assertFalse(zkUtils.exists(root));
139
140 // create the paths
141 zkUtils.makeSurePersistentPathsExists(
142 new String[]{root, keyBuilder.getJobModelVersionPath(), keyBuilder.getProcessorsPath()});
143 Assert.assertTrue(zkUtils.exists(root));
144 Assert.assertTrue(zkUtils.exists(keyBuilder.getJobModelVersionPath()));
145 Assert.assertTrue(zkUtils.exists(keyBuilder.getProcessorsPath()));
146
147 final Result res = new Result();
148 // define the callback
149 IZkDataListener dataListener = new IZkDataListener() {
150 @Override
151 public void handleDataChange(String dataPath, Object data)
152 throws Exception {
153 res.updateRes((String) data);
154 }
155
156 @Override
157 public void handleDataDeleted(String dataPath)
158 throws Exception {
159 Assert.fail("Data wasn't deleted;");
160 }
161 };
162 // subscribe
163 zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
164 zkClient.subscribeDataChanges(keyBuilder.getProcessorsPath(), dataListener);
165 // update
166 zkClient.writeData(keyBuilder.getJobModelVersionPath(), "newVersion");
167
168 // verify
169 Assert.assertTrue(testWithDelayBackOff(() -> "newVersion".equals(res.getRes()), 2, 1000));
170
171 // update again
172 zkClient.writeData(keyBuilder.getProcessorsPath(), "newProcessor");
173
174 Assert.assertTrue(testWithDelayBackOff(() -> "newProcessor".equals(res.getRes()), 2, 1000));
175 }
176
177 @Test
178 public void testPublishNewJobModel() {
179 ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
180 String root = keyBuilder.getRootPath();
181 zkClient.deleteRecursive(root);
182 String version = "1";
183 String oldVersion = "0";
184
185 zkUtils.makeSurePersistentPathsExists(
186 new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
187
188 zkUtils.publishJobModelVersion(oldVersion, version);
189 Assert.assertEquals(version, zkUtils.getJobModelVersion());
190
191 String newerVersion = Long.toString(Long.valueOf(version) + 1);
192 zkUtils.publishJobModelVersion(version, newerVersion);
193 Assert.assertEquals(newerVersion, zkUtils.getJobModelVersion());
194
195 try {
196 zkUtils.publishJobModelVersion(oldVersion, "10"); //invalid new version
197 Assert.fail("publish invalid version should've failed");
198 } catch (SamzaException e) {
199 // expected
200 }
201
202 // create job model
203 Map<String, String> configMap = new HashMap<>();
204 Map<String, ContainerModel> containers = new HashMap<>();
205 MapConfig config = new MapConfig(configMap);
206 JobModel jobModel = new JobModel(config, containers);
207
208 zkUtils.publishJobModel(version, jobModel);
209 Assert.assertEquals(jobModel, zkUtils.getJobModel(version));
210 }
211
212 public static boolean testWithDelayBackOff(BooleanSupplier cond, long startDelayMs, long maxDelayMs) {
213 long delay = startDelayMs;
214 while (delay < maxDelayMs) {
215 if (cond.getAsBoolean())
216 return true;
217 try {
218 Thread.sleep(delay);
219 } catch (InterruptedException e) {
220 return false;
221 }
222 delay *= 2;
223 }
224 return false;
225 }
226
227 public static void sleepMs(long delay) {
228 try {
229 Thread.sleep(delay);
230 } catch (InterruptedException e) {
231 Assert.fail("Sleep was interrupted");
232 }
233 }
234 }