Fix flaky, slow integration tests in TestZkStreamProcessor and TestZkStreamProcessorS...
authorShanthoosh Venkataraman <svenkataraman@linkedin.com>
Thu, 3 Aug 2017 22:48:23 +0000 (15:48 -0700)
committernavina <navina@apache.org>
Thu, 3 Aug 2017 22:48:23 +0000 (15:48 -0700)
Fix flaky and slow integration tests in TestZkStreamProcessor and TestZkStreamProcessorSession
Reason for failures:

There’re three configurable wait times in rebalancing phase in samza standalone before consensus is acheived and processing resumes with updated jobModel.

* debounceTime (Specified by `job.debounce.time.ms`. Upon processor change, leader waits for this interval before generating jobModel expecting stabilization in processors group(new arrival, deletion etc)).
* taskShutdownMs (Specified by `task.shutdown.ms`. Wait time for SamzaContainer shutdown in StreamProcessor).
* barrierWaitTimeOutMs (Specified by `job.coordinator.zk.consensus.timeout.ms`. Wait time for all processors in the group to join the barrier after creation).

Above wait times affects rebalancing phase duration. All these wait time have defaults in order of 40-60 seconds and not set to low values.

Flaky tests expects processors to come back up after rebalancing phase and drain message sources(Accomplished by checking a latch.count. RemoteApplicationRunner integration tests does exact same thing by checking if kafka input queue is drained directly with similar logic).

In worst case rebalancing phases can last upto 3-4 minutes(Making these tests sometime take 10 minutes at worst case).

Change:

Set all the above timeouts to 2 seconds(Sufficient for tests and verified by local build).

Benefits:

* Faster build time(Average runtime of these individual tests were reduced from 1m56s to 14s)
* More predicability in assertions(Didn’t fail even once in 30-40 attempts locally).

NOTE: If this doesn’t fix TestZkStreamProcessor and TestZkStreamProcessorSession,
longer term fix should be to use message markers in input source and
shutdown taskCoordinator upon receiving them from TaskImpl(Or use
bounded collection based pluggable InMemorySystemConsumer/InMemorySystemProducer).

Author: Shanthoosh Venkataraman <svenkataraman@linkedin.com>

Reviewers: Bharath Kumarasubramanian <codin.martial@gmail.com>, Navina Ramesh <navina@apache.org>

Closes #260 from shanthoosh/FIX_ZK_PROCESSOR_FLAKY_TESTS

samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java

index f2f1585..c848cde 100644 (file)
@@ -39,8 +39,10 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
@@ -66,6 +68,12 @@ import org.slf4j.LoggerFactory;
 
 
 public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness {
+  private static final String TASK_SHUTDOWN_MS = "2000";
+  private static final String JOB_DEBOUNCE_TIME_MS = "2000";
+  private static final String BARRIER_TIMEOUT_MS = "2000";
+  private static final String ZK_SESSION_TIMEOUT_MS = "2000";
+  private static final String ZK_CONNECTION_TIMEOUT_MS = "2000";
+
   public final static Logger LOG = LoggerFactory.getLogger(TestZkStreamProcessorBase.class);
   public final static int BAD_MESSAGE_KEY = 1000;
   // to avoid long sleeps, we rather use multiple attempts with shorter sleeps
@@ -181,6 +189,11 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness
     configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
 
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.zk.ZkJobCoordinatorFactory");
+    configs.put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS);
+    configs.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS);
+    configs.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS);
+    configs.put(ZkConfig.ZK_SESSION_TIMEOUT_MS, ZK_SESSION_TIMEOUT_MS);
+    configs.put(ZkConfig.ZK_CONNECTION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS);
 
     return configs;
   }