SAMZA-1715: Unit test for Kafka admin deletedMessagesCalled() fail
authorYi Pan (Data Infrastructure) <yipan@yipan-mn1.linkedin.biz>
Thu, 17 May 2018 05:19:37 +0000 (22:19 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Thu, 17 May 2018 05:19:37 +0000 (22:19 -0700)
Test locally and works.

Author: Yi Pan (Data Infrastructure) <yipan@yipan-mn1.linkedin.biz>

Reviewers: Jagadish <jagadish@apache.org>

Closes #523 from nickpan47/fix-unittest-deleted-messages

samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala

index c76f6e5..a63db03 100644 (file)
@@ -583,8 +583,6 @@ class KafkaSystemAdmin(
     * This only works with Kafka cluster 0.11 or later. Otherwise it's a no-op.
     */
   override def deleteMessages(offsets: util.Map[SystemStreamPartition, String]) {
-    deleteMessagesCalled = true
-
     if (!running) {
       throw new SamzaException(s"KafkaSystemAdmin has not started yet for system $systemName")
     }
@@ -593,6 +591,7 @@ class KafkaSystemAdmin(
         (new TopicPartition(systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId), offset.toLong + 1)
       }.toMap
       adminClient.deleteRecordsBefore(nextOffsets)
+      deleteMessagesCalled = true
     }
   }