SAMZA-1304: Handling duplicate stream processor registration.
[samza.git] / samza-test / src / test / java / org / apache / samza / test / processor / TestZkLocalApplicationRunner.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package org.apache.samza.test.processor;
21
22 import com.google.common.collect.ImmutableList;
23 import com.google.common.collect.ImmutableMap;
24 import com.google.common.collect.Maps;
25 import com.google.common.collect.Sets;
26 import java.io.Serializable;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Properties;
32 import java.util.UUID;
33 import java.util.concurrent.CountDownLatch;
34 import kafka.admin.AdminUtils;
35 import kafka.utils.TestUtils;
36 import org.I0Itec.zkclient.ZkClient;
37 import org.apache.kafka.clients.producer.KafkaProducer;
38 import org.apache.kafka.clients.producer.ProducerRecord;
39 import org.apache.samza.SamzaException;
40 import org.apache.samza.application.StreamApplication;
41 import org.apache.samza.config.ApplicationConfig;
42 import org.apache.samza.config.Config;
43 import org.apache.samza.config.JobConfig;
44 import org.apache.samza.config.JobCoordinatorConfig;
45 import org.apache.samza.config.MapConfig;
46 import org.apache.samza.config.TaskConfig;
47 import org.apache.samza.config.ZkConfig;
48 import org.apache.samza.job.ApplicationStatus;
49 import org.apache.samza.job.model.JobModel;
50 import org.apache.samza.operators.MessageStream;
51 import org.apache.samza.operators.OutputStream;
52 import org.apache.samza.operators.StreamGraph;
53 import org.apache.samza.runtime.LocalApplicationRunner;
54 import org.apache.samza.test.StandaloneIntegrationTestHarness;
55 import org.apache.samza.test.StandaloneTestUtils;
56 import org.apache.samza.util.NoOpMetricsRegistry;
57 import org.apache.samza.zk.ZkKeyBuilder;
58 import org.apache.samza.zk.ZkUtils;
59 import org.junit.Rule;
60 import org.junit.Test;
61 import org.junit.rules.ExpectedException;
62 import org.junit.rules.Timeout;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 import static org.junit.Assert.*;
67
68 /**
69 * Integration tests for {@link LocalApplicationRunner}.
70 *
71 * Brings up embedded ZooKeeper, Kafka broker and launches multiple {@link StreamApplication} through
72 * {@link LocalApplicationRunner} to verify the guarantees made in stand alone execution environment.
73 */
74 public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarness {
75
76 private static final Logger LOGGER = LoggerFactory.getLogger(TestZkLocalApplicationRunner.class);
77
78 private static final int NUM_KAFKA_EVENTS = 300;
79 private static final int ZK_CONNECTION_TIMEOUT_MS = 10000;
80 private static final String TEST_SYSTEM = "TestSystemName";
81 private static final String TEST_SSP_GROUPER_FACTORY = "org.apache.samza.container.grouper.stream.GroupByPartitionFactory";
82 private static final String TEST_TASK_GROUPER_FACTORY = "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory";
83 private static final String TEST_JOB_COORDINATOR_FACTORY = "org.apache.samza.zk.ZkJobCoordinatorFactory";
84 private static final String TEST_SYSTEM_FACTORY = "org.apache.samza.system.kafka.KafkaSystemFactory";
85 private static final String TEST_JOB_NAME = "test-job";
86 private static final String[] PROCESSOR_IDS = new String[] {"0000000000", "0000000001", "0000000002"};
87
88 private String inputKafkaTopic;
89 private String outputKafkaTopic;
90 private ZkUtils zkUtils;
91 private ApplicationConfig applicationConfig1;
92 private ApplicationConfig applicationConfig2;
93 private ApplicationConfig applicationConfig3;
94 private LocalApplicationRunner applicationRunner1;
95 private LocalApplicationRunner applicationRunner2;
96 private LocalApplicationRunner applicationRunner3;
97
98 // Set 90 seconds as max execution time for each test.
99 @Rule
100 public Timeout testTimeOutInMillis = new Timeout(90000);
101
102 @Rule
103 public final ExpectedException expectedException = ExpectedException.none();
104
105 @Override
106 public void setUp() {
107 super.setUp();
108 String uniqueTestId = UUID.randomUUID().toString();
109 String testStreamAppName = String.format("test-app-name-%s", uniqueTestId);
110 String testStreamAppId = String.format("test-app-id-%s", uniqueTestId);
111 inputKafkaTopic = String.format("test-input-topic-%s", uniqueTestId);
112 outputKafkaTopic = String.format("test-output-topic-%s", uniqueTestId);
113 ZkClient zkClient = new ZkClient(zkConnect());
114 ZkKeyBuilder zkKeyBuilder = new ZkKeyBuilder(String.format("app-%s-%s", testStreamAppName, testStreamAppId));
115 zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry());
116 zkUtils.connect();
117
118 // Set up stream application configs with different processorIds and same testStreamAppName, testStreamAppId.
119 applicationConfig1 = buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[0], testStreamAppName, testStreamAppId);
120 applicationConfig2 = buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[1], testStreamAppName, testStreamAppId);
121 applicationConfig3 = buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[2], testStreamAppName, testStreamAppId);
122
123 // Create local application runners.
124 applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
125 applicationRunner2 = new LocalApplicationRunner(applicationConfig2);
126 applicationRunner3 = new LocalApplicationRunner(applicationConfig3);
127
128 for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, outputKafkaTopic)) {
129 LOGGER.info("Creating kafka topic: {}.", kafkaTopic);
130 TestUtils.createTopic(zkUtils(), kafkaTopic, 5, 1, servers(), new Properties());
131 }
132 }
133
134 @Override
135 public void tearDown() {
136 for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, outputKafkaTopic)) {
137 LOGGER.info("Deleting kafka topic: {}.", kafkaTopic);
138 AdminUtils.deleteTopic(zkUtils(), kafkaTopic);
139 }
140 zkUtils.close();
141 super.tearDown();
142 }
143
144 private void publishKafkaEvents(String topic, int numEvents, String streamProcessorId) {
145 KafkaProducer producer = getKafkaProducer();
146 for (int eventIndex = 0; eventIndex < numEvents; eventIndex++) {
147 try {
148 LOGGER.info("Publish kafka event with index : {} for stream processor: {}.", eventIndex, streamProcessorId);
149 producer.send(new ProducerRecord(topic, new TestKafkaEvent(streamProcessorId, String.valueOf(eventIndex)).toString().getBytes()));
150 } catch (Exception e) {
151 LOGGER.error("Publishing to kafka topic: {} resulted in exception: {}.", new Object[]{topic, e});
152 throw new SamzaException(e);
153 }
154 }
155 }
156
157 private ApplicationConfig buildStreamApplicationConfig(String systemName, String inputTopic,
158 String processorId, String appName, String appId) {
159 Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder()
160 .put(TaskConfig.INPUT_STREAMS(), inputTopic)
161 .put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName)
162 .put(TaskConfig.IGNORED_EXCEPTIONS(), "*")
163 .put(ZkConfig.ZK_CONNECT, zkConnect())
164 .put(JobConfig.SSP_GROUPER_FACTORY(), TEST_SSP_GROUPER_FACTORY)
165 .put(TaskConfig.GROUPER_FACTORY(), TEST_TASK_GROUPER_FACTORY)
166 .put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, TEST_JOB_COORDINATOR_FACTORY)
167 .put(JobConfig.PROCESSOR_ID(), processorId)
168 .put(ApplicationConfig.APP_NAME, appName)
169 .put(ApplicationConfig.APP_ID, appId)
170 .put(String.format("systems.%s.samza.factory", systemName), TEST_SYSTEM_FACTORY)
171 .put(JobConfig.JOB_NAME(), TEST_JOB_NAME)
172 .build();
173 Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig);
174 applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(systemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
175 return new ApplicationConfig(new MapConfig(applicationConfig));
176 }
177
178 @Test
179 public void shouldReElectLeaderWhenLeaderDies() throws InterruptedException {
180 // Set up kafka topics.
181 publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
182
183 // Create stream applications.
184 CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
185 CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
186 CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
187 CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
188
189 StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch);
190 StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch);
191 StreamApplication streamApp3 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch3, null, kafkaEventsConsumedLatch);
192
193 // Run stream applications.
194 applicationRunner1.run(streamApp1);
195 applicationRunner2.run(streamApp2);
196 applicationRunner3.run(streamApp3);
197
198 // Wait until all processors have processed a message.
199 processedMessagesLatch1.await();
200 processedMessagesLatch2.await();
201 processedMessagesLatch3.await();
202
203 // Verifications before killing the leader.
204 JobModel jobModel = zkUtils.getJobModel(zkUtils.getJobModelVersion());
205 String prevJobModelVersion = zkUtils.getJobModelVersion();
206 assertEquals(3, jobModel.getContainers().size());
207 assertEquals(Sets.newHashSet("0000000000", "0000000001", "0000000002"), jobModel.getContainers().keySet());
208 assertEquals("1", prevJobModelVersion);
209
210 List<String> processorIdsFromZK = zkUtils.getActiveProcessorsIDs(Arrays.asList(PROCESSOR_IDS));
211
212 assertEquals(3, processorIdsFromZK.size());
213 assertEquals(PROCESSOR_IDS[0], processorIdsFromZK.get(0));
214
215 // Kill the leader. Since streamApp1 is the first to join the cluster, it's the leader.
216 applicationRunner1.kill(streamApp1);
217 kafkaEventsConsumedLatch.await();
218
219 // Verifications after killing the leader.
220 assertEquals(ApplicationStatus.SuccessfulFinish, applicationRunner1.status(streamApp1));
221 processorIdsFromZK = zkUtils.getActiveProcessorsIDs(ImmutableList.of(PROCESSOR_IDS[1], PROCESSOR_IDS[2]));
222 assertEquals(2, processorIdsFromZK.size());
223 assertEquals(PROCESSOR_IDS[1], processorIdsFromZK.get(0));
224 jobModel = zkUtils.getJobModel(zkUtils.getJobModelVersion());
225 assertEquals(Sets.newHashSet("0000000001", "0000000002"), jobModel.getContainers().keySet());
226 String currentJobModelVersion = zkUtils.getJobModelVersion();
227 assertEquals(2, jobModel.getContainers().size());
228 assertEquals("2", currentJobModelVersion);
229 }
230
231 @Test
232 public void shouldFailWhenNewProcessorJoinsWithSameIdAsExistingProcessor() throws InterruptedException {
233 // Set up kafka topics.
234 publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
235
236 // Create StreamApplications.
237 CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
238 CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
239 CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
240
241 StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch);
242 StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch);
243
244 // Run stream applications.
245 applicationRunner1.run(streamApp1);
246 applicationRunner2.run(streamApp2);
247
248 // Wait for message processing to start in both the processors.
249 processedMessagesLatch1.await();
250 processedMessagesLatch2.await();
251
252 LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(new MapConfig(applicationConfig2));
253
254 // Create a stream app with same processor id as SP2 and run it. It should fail.
255 publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[2]);
256 kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
257 StreamApplication streamApp3 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null, null, kafkaEventsConsumedLatch);
258 // Fail when the duplicate processor joins.
259 expectedException.expect(SamzaException.class);
260 applicationRunner3.run(streamApp3);
261 }
262
263 // Depends upon SAMZA-1302
264 // @Test(expected = Exception.class)
265 public void shouldKillStreamAppWhenZooKeeperDiesBeforeLeaderReElection() throws InterruptedException {
266 // Create StreamApplications.
267 StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null, null, null);
268 StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null, null, null);
269
270 // Run stream applications.
271 applicationRunner1.run(streamApp1);
272 applicationRunner2.run(streamApp2);
273
274 applicationRunner1.kill(streamApp1);
275 applicationRunner1.waitForFinish();
276 assertEquals(ApplicationStatus.SuccessfulFinish, applicationRunner1.status(streamApp2));
277
278 // Kill zookeeper server and expect job model regeneration and ZK fencing will fail with exception.
279 zookeeper().shutdown();
280
281 applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
282 applicationRunner1.run(streamApp1);
283 // This line should throw exception since Zookeeper is unavailable.
284 applicationRunner1.waitForFinish();
285 }
286
287 // Depends upon SAMZA-1302
288 // @Test
289 public void testRollingUpgrade() throws Exception {
290 publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
291
292 List<TestKafkaEvent> messagesProcessed = new ArrayList<>();
293 StreamApplicationCallback streamApplicationCallback = messagesProcessed::add;
294
295 // Create StreamApplication from configuration.
296 CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
297 CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
298 CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
299
300 StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, streamApplicationCallback, kafkaEventsConsumedLatch);
301 StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch);
302
303 // Run stream application.
304 applicationRunner1.run(streamApp1);
305 applicationRunner2.run(streamApp2);
306
307 processedMessagesLatch1.await();
308 processedMessagesLatch2.await();
309
310 // Read job model before rolling upgrade.
311 String jobModelVersion = zkUtils.getJobModelVersion();
312 JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
313
314 applicationRunner1.kill(streamApp1);
315 applicationRunner1.waitForFinish();
316
317 int lastProcessedMessageId = -1;
318 for (TestKafkaEvent message : messagesProcessed) {
319 lastProcessedMessageId = Math.max(lastProcessedMessageId, Integer.parseInt(message.getEventId()));
320 }
321
322 LocalApplicationRunner applicationRunner4 = new LocalApplicationRunner(applicationConfig1);
323 applicationRunner4.run(streamApp1);
324 applicationRunner4.waitForFinish();
325
326 // Kill both the stream applications.
327 applicationRunner4.kill(streamApp1);
328 applicationRunner4.waitForFinish();
329 applicationRunner2.kill(streamApp2);
330 applicationRunner2.waitForFinish();
331
332 // Read new job model after rolling upgrade.
333 String newJobModelVersion = zkUtils.getJobModelVersion();
334 JobModel newJobModel = zkUtils.getJobModel(newJobModelVersion);
335
336 // This should be continuation of last processed message.
337 int nextSeenMessageId = Integer.parseInt(messagesProcessed.get(0).getEventId());
338 assertTrue(lastProcessedMessageId < nextSeenMessageId);
339
340 // Assertions on job model read from zookeeper.
341 assertFalse(newJobModelVersion.equals(jobModelVersion));
342 assertEquals(jobModel, newJobModel);
343 assertEquals(Sets.newHashSet("0000000001", "0000000002"), jobModel.getContainers().keySet());
344 String currentJobModelVersion = zkUtils.getJobModelVersion();
345 List<String> processorIdsFromZK = zkUtils.getActiveProcessorsIDs(Arrays.asList(PROCESSOR_IDS));
346 assertEquals(3, processorIdsFromZK.size());
347 assertEquals(ImmutableList.of(PROCESSOR_IDS[1], PROCESSOR_IDS[2]), processorIdsFromZK);
348 assertEquals(2, jobModel.getContainers().size());
349 assertEquals("2", currentJobModelVersion);
350 }
351
352 public interface StreamApplicationCallback {
353 void onMessageReceived(TestKafkaEvent message);
354 }
355
356 private static class TestKafkaEvent implements Serializable {
357
358 // Actual content of the event.
359 private String eventData;
360
361 // Contains Integer value, which is greater than previous message id.
362 private String eventId;
363
364 TestKafkaEvent(String eventId, String eventData) {
365 this.eventData = eventData;
366 this.eventId = eventId;
367 }
368
369 String getEventId() {
370 return eventId;
371 }
372
373 String getEventData() {
374 return eventData;
375 }
376
377 @Override
378 public String toString() {
379 return eventId + "|" + eventData;
380 }
381
382 static TestKafkaEvent fromString(String message) {
383 String[] messageComponents = message.split("|");
384 return new TestKafkaEvent(messageComponents[0], messageComponents[1]);
385 }
386 }
387
388 /**
389 * Publishes all input events to output topic(has no processing logic)
390 * and triggers {@link StreamApplicationCallback} with each received event.
391 **/
392 private static class TestStreamApplication implements StreamApplication {
393
394 private final String inputTopic;
395 private final String outputTopic;
396 private final CountDownLatch processedMessagesLatch;
397 private final StreamApplicationCallback streamApplicationCallback;
398 private final CountDownLatch kafkaEventsConsumedLatch;
399
400 TestStreamApplication(String inputTopic, String outputTopic,
401 CountDownLatch processedMessagesLatch,
402 StreamApplicationCallback streamApplicationCallback, CountDownLatch kafkaEventsConsumedLatch) {
403 this.inputTopic = inputTopic;
404 this.outputTopic = outputTopic;
405 this.processedMessagesLatch = processedMessagesLatch;
406 this.streamApplicationCallback = streamApplicationCallback;
407 this.kafkaEventsConsumedLatch = kafkaEventsConsumedLatch;
408 }
409
410 @Override
411 public void init(StreamGraph graph, Config config) {
412 MessageStream<String> inputStream = graph.getInputStream(inputTopic, (key, msg) -> {
413 TestKafkaEvent incomingMessage = TestKafkaEvent.fromString((String) msg);
414 if (streamApplicationCallback != null) {
415 streamApplicationCallback.onMessageReceived(incomingMessage);
416 }
417 if (processedMessagesLatch != null) {
418 processedMessagesLatch.countDown();
419 }
420 if (kafkaEventsConsumedLatch != null) {
421 kafkaEventsConsumedLatch.countDown();
422 }
423 return incomingMessage.toString();
424 });
425 OutputStream<String, String, String> outputStream = graph.getOutputStream(outputTopic, event -> null, event -> event);
426 inputStream.sendTo(outputStream);
427 }
428 }
429 }