SAMZA-1211; Remove Thread.sleep() from TestJoinOperator tests
authorPrateek Maheshwari <pmaheshw@linkedin.com>
Mon, 17 Apr 2017 19:09:25 +0000 (12:09 -0700)
committervjagadish1989 <jvenkatr@linkedin.com>
Mon, 17 Apr 2017 19:09:25 +0000 (12:09 -0700)
Author: Prateek Maheshwari <pmaheshw@linkedin.com>

Reviewers: Jagadish <jagadish@apache.org>

Closes #123 from prateekm/join-test-no-sleep

samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java

index 709f2a0..8e492dc 100644 (file)
@@ -180,7 +180,7 @@ public class OperatorImplGraph {
     } else if (operatorSpec instanceof WindowOperatorSpec) {
       return new WindowOperatorImpl((WindowOperatorSpec<M, ?, ?>) operatorSpec, clock);
     } else if (operatorSpec instanceof PartialJoinOperatorSpec) {
-      return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context);
+      return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context, clock);
     }
     throw new IllegalArgumentException(
         String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName()));
index b2948a3..e4cb9c2 100644 (file)
@@ -18,6 +18,8 @@
  */
 package org.apache.samza.operators.impl;
 
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.functions.PartialJoinFunction;
@@ -29,12 +31,10 @@ import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.util.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * Implementation of a {@link PartialJoinOperatorSpec} that joins messages of type {@code M} in this stream
  * with buffered messages of type {@code JM} in the other stream.
@@ -51,21 +51,23 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
   private final PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn;
   private final long ttlMs;
   private final int opId;
+  private final Clock clock;
 
   PartialJoinOperatorImpl(PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOperatorSpec, MessageStreamImpl<M> source,
-      Config config, TaskContext context) {
+      Config config, TaskContext context, Clock clock) {
     this.thisPartialJoinFn = partialJoinOperatorSpec.getThisPartialJoinFn();
     this.otherPartialJoinFn = partialJoinOperatorSpec.getOtherPartialJoinFn();
     this.ttlMs = partialJoinOperatorSpec.getTtlMs();
     this.opId = partialJoinOperatorSpec.getOpId();
+    this.clock = clock;
   }
 
   @Override
   public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
     K key = thisPartialJoinFn.getKey(message);
-    thisPartialJoinFn.getState().put(key, new PartialJoinMessage<>(message, System.currentTimeMillis()));
+    thisPartialJoinFn.getState().put(key, new PartialJoinMessage<>(message, clock.currentTimeMillis()));
     PartialJoinMessage<JM> otherMessage = otherPartialJoinFn.getState().get(key);
-    long now = System.currentTimeMillis();
+    long now = clock.currentTimeMillis();
     if (otherMessage != null && otherMessage.getReceivedTimeMs() > now - ttlMs) {
       RM joinResult = thisPartialJoinFn.apply(message, otherMessage.getMessage());
       this.propagateResult(joinResult, collector, coordinator);
@@ -74,7 +76,7 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
 
   @Override
   public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
-    long now = System.currentTimeMillis();
+    long now = clock.currentTimeMillis();
 
     KeyValueStore<K, PartialJoinMessage<M>> thisState = thisPartialJoinFn.getState();
     KeyValueIterator<K, PartialJoinMessage<M>> iterator = thisState.all();
@@ -92,7 +94,7 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
     iterator.close();
     thisState.deleteAll(keysToRemove);
 
-    LOGGER.info("Operator ID {} onTimer self time: {} ms", opId, System.currentTimeMillis() - now);
+    LOGGER.info("Operator ID {} onTimer self time: {} ms", opId, clock.currentTimeMillis() - now);
   }
 
 }
index 1180179..1135726 100644 (file)
 package org.apache.samza.operators;
 
 import com.google.common.collect.ImmutableSet;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
 import org.apache.samza.Partition;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
@@ -33,13 +37,11 @@ import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamOperatorTask;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.testUtils.TestClock;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.SystemClock;
 import org.junit.Test;
 
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -51,7 +53,7 @@ public class TestJoinOperator {
 
   @Test
   public void join() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock());
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
 
@@ -67,7 +69,7 @@ public class TestJoinOperator {
 
   @Test
   public void joinReverse() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock());
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
 
@@ -82,7 +84,7 @@ public class TestJoinOperator {
 
   @Test
   public void joinNoMatch() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock());
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
 
@@ -96,7 +98,7 @@ public class TestJoinOperator {
 
   @Test
   public void joinNoMatchReverse() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock());
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
 
@@ -110,7 +112,7 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsLatestMessageForKey() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock());
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
 
@@ -127,7 +129,7 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsLatestMessageForKeyReverse() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock());
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
 
@@ -144,7 +146,7 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsMatchedMessages() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock());
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
 
@@ -166,7 +168,7 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsMatchedMessagesReverse() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock());
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
 
@@ -188,14 +190,15 @@ public class TestJoinOperator {
 
   @Test
   public void joinRemovesExpiredMessages() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    TestClock testClock = new TestClock();
+    StreamOperatorTask sot = createStreamOperatorTask(testClock);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
 
     // push messages to first stream
     numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
 
-    Thread.sleep(100); // 10 * ttl for join
+    testClock.advanceTime(100);
     sot.window(messageCollector, taskCoordinator); // should expire first stream messages
 
     // push messages to second stream with same key
@@ -207,14 +210,15 @@ public class TestJoinOperator {
 
   @Test
   public void joinRemovesExpiredMessagesReverse() throws Exception {
-    StreamOperatorTask sot = createStreamOperatorTask();
+    TestClock testClock = new TestClock();
+    StreamOperatorTask sot = createStreamOperatorTask(testClock);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
 
     // push messages to second stream
     numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
 
-    Thread.sleep(100); // 10 * ttl for join
+    testClock.advanceTime(100); // 10 * ttl for join
     sot.window(messageCollector, taskCoordinator); // should expire second stream messages
 
     // push messages to first stream with same key
@@ -223,7 +227,7 @@ public class TestJoinOperator {
     assertTrue(output.isEmpty());
   }
 
-  private StreamOperatorTask createStreamOperatorTask() throws Exception {
+  private StreamOperatorTask createStreamOperatorTask(Clock clock) throws Exception {
     ApplicationRunner runner = mock(ApplicationRunner.class);
     when(runner.getStreamSpec("instream")).thenReturn(new StreamSpec("instream", "instream", "insystem"));
     when(runner.getStreamSpec("instream2")).thenReturn(new StreamSpec("instream2", "instream2", "insystem2"));
@@ -235,7 +239,7 @@ public class TestJoinOperator {
     Config config = mock(Config.class);
 
     StreamApplication sgb = new TestStreamApplication();
-    StreamOperatorTask sot = new StreamOperatorTask(sgb, runner);
+    StreamOperatorTask sot = new StreamOperatorTask(sgb, runner, clock);
     sot.init(config, taskContext);
     return sot;
   }