SAMZA-1384: Race condition with async commit affects checkpoint correctness
[samza.git] / samza-core / src / test / java / org / apache / samza / task / TestAsyncRunLoop.java
index 1afc26a..6694f26 100644 (file)
@@ -50,8 +50,7 @@ import scala.Option;
 import scala.collection.JavaConverters;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -368,8 +367,10 @@ public class TestAsyncRunLoop {
     task0ProcessedMessagesLatch.await();
     task1ProcessedMessagesLatch.await();
 
-    verify(offsetManager).checkpoint(taskName0);
-    verify(offsetManager, never()).checkpoint(taskName1);
+    verify(offsetManager).buildCheckpoint(taskName0);
+    verify(offsetManager).writeCheckpoint(taskName0, any());
+    verify(offsetManager, never()).buildCheckpoint(taskName1);
+    verify(offsetManager, never()).writeCheckpoint(taskName1, any());
   }
 
   //@Test
@@ -398,8 +399,10 @@ public class TestAsyncRunLoop {
     task0ProcessedMessagesLatch.await();
     task1ProcessedMessagesLatch.await();
 
-    verify(offsetManager).checkpoint(taskName0);
-    verify(offsetManager).checkpoint(taskName1);
+    verify(offsetManager).buildCheckpoint(taskName0);
+    verify(offsetManager).writeCheckpoint(taskName0, any());
+    verify(offsetManager).buildCheckpoint(taskName1);
+    verify(offsetManager).writeCheckpoint(taskName1, any());
   }
 
   //@Test
@@ -552,8 +555,10 @@ public class TestAsyncRunLoop {
     task0ProcessedMessagesLatch.await();
     task1ProcessedMessagesLatch.await();
 
-    verify(offsetManager).checkpoint(taskName0);
-    verify(offsetManager).checkpoint(taskName1);
+    verify(offsetManager).buildCheckpoint(taskName0);
+    verify(offsetManager).writeCheckpoint(taskName0, any());
+    verify(offsetManager).buildCheckpoint(taskName1);
+    verify(offsetManager).writeCheckpoint(taskName1, any());
   }
 
   // TODO: Add assertions.
@@ -641,7 +646,8 @@ public class TestAsyncRunLoop {
           secondMsgCompletionLatch.countDown();
           // OffsetManager.update with firstMsg offset, task.commit has happened when second message callback has not completed.
           verify(offsetManager).update(taskName0, firstMsg.getSystemStreamPartition(), firstMsg.getOffset());
-          verify(offsetManager, atLeastOnce()).checkpoint(taskName0);
+          verify(offsetManager, atLeastOnce()).buildCheckpoint(taskName0);
+          verify(offsetManager, atLeastOnce()).writeCheckpoint(taskName0, any());
         }
       } catch (Exception e) {
         e.printStackTrace();