SAMZA-1384: Race condition with async commit affects checkpoint correctness
[samza.git] / samza-core / src / main / scala / org / apache / samza / container / TaskInstance.scala
index 84e993b..65fefda 100644 (file)
@@ -206,6 +206,8 @@ class TaskInstance(
   def commit {
     metrics.commits.inc
 
+    val checkpoint = offsetManager.buildCheckpoint(taskName)
+
     trace("Flushing producers for taskName: %s" format taskName)
 
     collector.flush
@@ -218,7 +220,7 @@ class TaskInstance(
 
     trace("Checkpointing offsets for taskName: %s" format taskName)
 
-    offsetManager.checkpoint(taskName)
+    offsetManager.writeCheckpoint(taskName, checkpoint)
   }
 
   def shutdownTask {