SAMZA-1304: Handling duplicate stream processor registration.
[samza.git] / samza-core / src / test / java / org / apache / samza / zk / TestZkUtils.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.samza.zk;
20
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.function.BooleanSupplier;
25 import org.I0Itec.zkclient.IZkDataListener;
26 import org.I0Itec.zkclient.ZkClient;
27 import org.I0Itec.zkclient.ZkConnection;
28 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
29 import org.apache.samza.SamzaException;
30 import org.apache.samza.config.MapConfig;
31 import org.apache.samza.job.model.ContainerModel;
32 import org.apache.samza.job.model.JobModel;
33 import org.apache.samza.testUtils.EmbeddedZookeeper;
34 import org.apache.samza.util.NoOpMetricsRegistry;
35 import org.junit.After;
36 import org.junit.AfterClass;
37 import org.junit.Assert;
38 import org.junit.Before;
39 import org.junit.BeforeClass;
40 import org.junit.Rule;
41 import org.junit.rules.ExpectedException;
42 import org.junit.Test;
43
44 public class TestZkUtils {
45 private static EmbeddedZookeeper zkServer = null;
46 private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
47 private ZkClient zkClient = null;
48 private static final int SESSION_TIMEOUT_MS = 20000;
49 private static final int CONNECTION_TIMEOUT_MS = 10000;
50 private ZkUtils zkUtils;
51
52 @Rule
53 // Declared public to honor junit contract.
54 public final ExpectedException expectedException = ExpectedException.none();
55
56 @BeforeClass
57 public static void setup() throws InterruptedException {
58 zkServer = new EmbeddedZookeeper();
59 zkServer.setup();
60 }
61
62 @Before
63 public void testSetup() {
64 try {
65 zkClient = new ZkClient(
66 new ZkConnection("127.0.0.1:" + zkServer.getPort(), SESSION_TIMEOUT_MS),
67 CONNECTION_TIMEOUT_MS);
68 } catch (Exception e) {
69 Assert.fail("Client connection setup failed. Aborting tests..");
70 }
71 try {
72 zkClient.createPersistent(KEY_BUILDER.getProcessorsPath(), true);
73 } catch (ZkNodeExistsException e) {
74 // Do nothing
75 }
76
77 zkUtils = getZkUtils();
78
79 zkUtils.connect();
80 }
81
82 @After
83 public void testTeardown() {
84 zkUtils.close();
85 zkClient.close();
86 }
87
88 private ZkUtils getZkUtils() {
89 return new ZkUtils(KEY_BUILDER, zkClient,
90 SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
91 }
92
93 @AfterClass
94 public static void teardown() {
95 zkServer.teardown();
96 }
97
98 @Test
99 public void testRegisterProcessorId() {
100 String assignedPath = zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1"));
101 Assert.assertTrue(assignedPath.startsWith(KEY_BUILDER.getProcessorsPath()));
102
103 // Calling registerProcessorId again should return the same ephemeralPath as long as the session is valid
104 Assert.assertTrue(zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1")).equals(assignedPath));
105
106 }
107
108 @Test
109 public void testGetActiveProcessors() {
110 Assert.assertEquals(0, zkUtils.getSortedActiveProcessorsZnodes().size());
111 zkUtils.registerProcessorAndGetId(new ProcessorData("processorData", "1"));
112 Assert.assertEquals(1, zkUtils.getSortedActiveProcessorsZnodes().size());
113 }
114
115 @Test
116 public void testGetProcessorsIDs() {
117 Assert.assertEquals(0, zkUtils.getSortedActiveProcessorsIDs().size());
118 zkUtils.registerProcessorAndGetId(new ProcessorData("host1", "1"));
119 List<String> l = zkUtils.getSortedActiveProcessorsIDs();
120 Assert.assertEquals(1, l.size());
121 new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()).registerProcessorAndGetId(new ProcessorData("host2", "2"));
122 l = zkUtils.getSortedActiveProcessorsIDs();
123 Assert.assertEquals(2, l.size());
124
125 Assert.assertEquals(" ID1 didn't match", "1", l.get(0));
126 Assert.assertEquals(" ID2 didn't match", "2", l.get(1));
127 }
128
129 @Test
130 public void testSubscribeToJobModelVersionChange() {
131
132 ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
133 String root = keyBuilder.getRootPath();
134 zkClient.deleteRecursive(root);
135
136 class Result {
137 String res = "";
138 public String getRes() {
139 return res;
140 }
141 public void updateRes(String newRes) {
142 res = newRes;
143 }
144 }
145
146 Assert.assertFalse(zkUtils.exists(root));
147
148 // create the paths
149 zkUtils.makeSurePersistentPathsExists(
150 new String[]{root, keyBuilder.getJobModelVersionPath(), keyBuilder.getProcessorsPath()});
151 Assert.assertTrue(zkUtils.exists(root));
152 Assert.assertTrue(zkUtils.exists(keyBuilder.getJobModelVersionPath()));
153 Assert.assertTrue(zkUtils.exists(keyBuilder.getProcessorsPath()));
154
155 final Result res = new Result();
156 // define the callback
157 IZkDataListener dataListener = new IZkDataListener() {
158 @Override
159 public void handleDataChange(String dataPath, Object data)
160 throws Exception {
161 res.updateRes((String) data);
162 }
163
164 @Override
165 public void handleDataDeleted(String dataPath)
166 throws Exception {
167 Assert.fail("Data wasn't deleted;");
168 }
169 };
170 // subscribe
171 zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
172 zkClient.subscribeDataChanges(keyBuilder.getProcessorsPath(), dataListener);
173 // update
174 zkClient.writeData(keyBuilder.getJobModelVersionPath(), "newVersion");
175
176 // verify
177 Assert.assertTrue(testWithDelayBackOff(() -> "newVersion".equals(res.getRes()), 2, 1000));
178
179 // update again
180 zkClient.writeData(keyBuilder.getProcessorsPath(), "newProcessor");
181
182 Assert.assertTrue(testWithDelayBackOff(() -> "newProcessor".equals(res.getRes()), 2, 1000));
183 }
184
185 /**
186 * Create two duplicate processors with same processorId.
187 * Second creation should fail with exception.
188 */
189 @Test
190 public void testRegisterProcessorAndGetIdShouldFailForDuplicateProcessorRegistration() {
191 final String testHostName = "localhost";
192 final String testProcessId = "testProcessorId";
193 ProcessorData processorData1 = new ProcessorData(testHostName, testProcessId);
194 // Register processor 1 which is not duplicate, this registration should succeed.
195 zkUtils.registerProcessorAndGetId(processorData1);
196
197 ZkUtils zkUtils1 = getZkUtils();
198 zkUtils1.connect();
199 ProcessorData duplicateProcessorData = new ProcessorData(testHostName, testProcessId);
200 // Registration of the duplicate processor should fail.
201 expectedException.expect(SamzaException.class);
202 zkUtils1.registerProcessorAndGetId(duplicateProcessorData);
203 }
204
205 @Test
206 public void testPublishNewJobModel() {
207 ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
208 String root = keyBuilder.getRootPath();
209 zkClient.deleteRecursive(root);
210 String version = "1";
211 String oldVersion = "0";
212
213 zkUtils.makeSurePersistentPathsExists(
214 new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
215
216 zkUtils.publishJobModelVersion(oldVersion, version);
217 Assert.assertEquals(version, zkUtils.getJobModelVersion());
218
219 String newerVersion = Long.toString(Long.valueOf(version) + 1);
220 zkUtils.publishJobModelVersion(version, newerVersion);
221 Assert.assertEquals(newerVersion, zkUtils.getJobModelVersion());
222
223 try {
224 zkUtils.publishJobModelVersion(oldVersion, "10"); //invalid new version
225 Assert.fail("publish invalid version should've failed");
226 } catch (SamzaException e) {
227 // expected
228 }
229
230 // create job model
231 Map<String, String> configMap = new HashMap<>();
232 Map<String, ContainerModel> containers = new HashMap<>();
233 MapConfig config = new MapConfig(configMap);
234 JobModel jobModel = new JobModel(config, containers);
235
236 zkUtils.publishJobModel(version, jobModel);
237 Assert.assertEquals(jobModel, zkUtils.getJobModel(version));
238 }
239
240 public static boolean testWithDelayBackOff(BooleanSupplier cond, long startDelayMs, long maxDelayMs) {
241 long delay = startDelayMs;
242 while (delay < maxDelayMs) {
243 if (cond.getAsBoolean())
244 return true;
245 try {
246 Thread.sleep(delay);
247 } catch (InterruptedException e) {
248 return false;
249 }
250 delay *= 2;
251 }
252 return false;
253 }
254
255 public static void sleepMs(long delay) {
256 try {
257 Thread.sleep(delay);
258 } catch (InterruptedException e) {
259 Assert.fail("Sleep was interrupted");
260 }
261 }
262 }