* which would throw off consumer position. Fix position if this happens.
*/
private def paranoidPoll(c: Consumer[K, V]): Unit = {
+ // don't actually want to consume any messages, so pause all partitions
+ c.pause(c.assignment())
val msgs = c.poll(0)
if (!msgs.isEmpty) {
// position should be minimum offset per topicpartition
// position for new partitions determined by auto.offset.reset if no commit
currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
- // don't want to consume messages, so pause
- c.pause(newPartitions.asJava)
// find latest available offsets
c.seekToEnd(currentOffsets.keySet.asJava)
parts.map(tp => tp -> c.position(tp)).toMap
tp -> c.position(tp)
}.toMap
}
-
- // don't actually want to consume any messages, so pause all partitions
- c.pause(currentOffsets.keySet.asJava)
}
override def stop(): Unit = this.synchronized {