SAMZA-1732: Reduce the coordination timeouts in TestZkLocalApplicationRunner tests.
authorShanthoosh Venkataraman <svenkataraman@linkedin.com>
Tue, 29 May 2018 23:26:25 +0000 (16:26 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Tue, 29 May 2018 23:26:25 +0000 (16:26 -0700)
Currently all the tests in TestZkLocalApplicationRunner takes around 5 minutes to finish. Reducing the coordination timeout to reduce the test time.

Changes in TestZkLocalApplicationRunner test timeout values:
* Change debounce timeout from 20 seconds to 2 seconds.
* Change task.shutdown timeout from 30 seconds to 2 seconds.
* Change barrier timeout from 40 seconds to 2 seconds.

After this change, execution time of TestZkLocalApplicationRunner tests has reduced from `310` seconds to `55` seconds.

Author: Shanthoosh Venkataraman <svenkataraman@linkedin.com>

Reviewers: Jagadish <jagadish@apache.org>

Closes #537 from shanthoosh/reduce_zk_localAppRunnerTestTime

samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java

index 0b0a271..ea44052 100644 (file)
@@ -88,8 +88,9 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
   private static final String TEST_TASK_GROUPER_FACTORY = "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory";
   private static final String TEST_JOB_COORDINATOR_FACTORY = "org.apache.samza.zk.ZkJobCoordinatorFactory";
   private static final String TEST_SYSTEM_FACTORY = "org.apache.samza.system.kafka.KafkaSystemFactory";
-  private static final String TASK_SHUTDOWN_MS = "20000";
-  private static final String JOB_DEBOUNCE_TIME_MS = "30000";
+  private static final String TASK_SHUTDOWN_MS = "2000";
+  private static final String JOB_DEBOUNCE_TIME_MS = "2000";
+  private static final String BARRIER_TIMEOUT_MS = "2000";
   private static final String[] PROCESSOR_IDS = new String[] {"0000000000", "0000000001", "0000000002"};
 
   private String inputKafkaTopic;
@@ -185,6 +186,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
   private Map<String, String> buildStreamApplicationConfigMap(String systemName, String inputTopic,
       String appName, String appId) {
     Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder()
+        .put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS)
         .put(TaskConfig.INPUT_STREAMS(), inputTopic)
         .put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName)
         .put(TaskConfig.IGNORED_EXCEPTIONS(), "*")
@@ -489,11 +491,10 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     processedMessagesLatch1.await();
     processedMessagesLatch2.await();
 
-    LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(applicationConfig2);
+    MapConfig appConfig = new ApplicationConfig(new MapConfig(applicationConfig2, ImmutableMap.of(ZkConfig.ZK_SESSION_TIMEOUT_MS, "10")));
+    LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(appConfig);
 
     // Create a stream app with same processor id as SP2 and run it. It should fail.
-    publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[2]);
-    kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
     StreamApplication streamApp3 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, null, null, kafkaEventsConsumedLatch, applicationConfig2);
     // Fail when the duplicate processor joins.
     expectedException.expect(SamzaException.class);
@@ -514,12 +515,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     // Set up kafka topics.
     publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
 
-    /**
-     * Custom listeners can't be plugged in for transition events(generatingNewJobModel, waitingForProcessors, waitingForBarrierCompletion etc) from zkJobCoordinator. Only possible listeners
-     * are for ZkJobCoordinator output(onNewJobModelConfirmed, onNewJobModelAvailable). Increasing DefaultDebounceTime to make sure that streamApplication dies & rejoins before expiry.
-     */
     Map<String, String> configMap = buildStreamApplicationConfigMap(TEST_SYSTEM, inputKafkaTopic, testStreamAppName, testStreamAppId);
-    configMap.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), "40000");
 
     configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
     Config applicationConfig1 = new MapConfig(configMap);