[SPARK-24713] AppMatser of spark streaming kafka OOM if there are hund…
authorYuanbo Liu <yuanbo@Yuanbos-MacBook-Air.local>
Fri, 13 Jul 2018 13:37:24 +0000 (07:37 -0600)
committercody koeninger <cody@koeninger.org>
Fri, 13 Jul 2018 13:37:24 +0000 (07:37 -0600)
We have hundreds of kafka topics need to be consumed in one application. The application master will throw OOM exception after hanging for nearly half of an hour.

OOM happens in the env with a lot of topics, and it's not convenient to set up such kind of env in the unit test. So I didn't change/add test case.

Author: Yuanbo Liu <yuanbo@Yuanbos-MacBook-Air.local>
Author: yuanbo <yuanbo@apache.org>

Closes #21690 from yuanboliu/master.

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala

index c322148..0246006 100644 (file)
@@ -166,6 +166,8 @@ private[spark] class DirectKafkaInputDStream[K, V](
    * which would throw off consumer position.  Fix position if this happens.
    */
   private def paranoidPoll(c: Consumer[K, V]): Unit = {
+    // don't actually want to consume any messages, so pause all partitions
+    c.pause(c.assignment())
     val msgs = c.poll(0)
     if (!msgs.isEmpty) {
       // position should be minimum offset per topicpartition
@@ -204,8 +206,6 @@ private[spark] class DirectKafkaInputDStream[K, V](
     // position for new partitions determined by auto.offset.reset if no commit
     currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
 
-    // don't want to consume messages, so pause
-    c.pause(newPartitions.asJava)
     // find latest available offsets
     c.seekToEnd(currentOffsets.keySet.asJava)
     parts.map(tp => tp -> c.position(tp)).toMap
@@ -262,9 +262,6 @@ private[spark] class DirectKafkaInputDStream[K, V](
         tp -> c.position(tp)
       }.toMap
     }
-
-    // don't actually want to consume any messages, so pause all partitions
-    c.pause(currentOffsets.keySet.asJava)
   }
 
   override def stop(): Unit = this.synchronized {