[SPARK-20168][STREAMING KINESIS] Setting the timestamp directly would cause exception...
authorYash Sharma <ysharma@atlassian.com>
Thu, 12 Jul 2018 17:04:47 +0000 (10:04 -0700)
committerSean Owen <srowen@gmail.com>
Thu, 12 Jul 2018 17:04:47 +0000 (10:04 -0700)
Setting the timestamp directly would cause exception on reading stream, it can be set directly only if the mode is not AT_TIMESTAMP

## What changes were proposed in this pull request?

The last patch in the kinesis streaming receiver sets the timestamp for the mode AT_TIMESTAMP, but this mode can only be set via the

`baseClientLibConfiguration.withTimestampAtInitialPositionInStream()
`
and can't be set directly using
`.withInitialPositionInStream()`

This patch fixes the issue.

## How was this patch tested?
Kinesis Receiver doesn't expose the internal state outside, so couldn't find the right way to test this change. Seeking for tips from other contributors here.

Author: Yash Sharma <ysharma@atlassian.com>

Closes #21541 from yashs360/ysharma/fix_kinesis_bug.

external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala

index fa0de62..69c5236 100644 (file)
@@ -160,7 +160,6 @@ private[kinesis] class KinesisReceiver[T](
         cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
         workerId)
         .withKinesisEndpoint(endpointUrl)
-        .withInitialPositionInStream(initialPosition.getPosition)
         .withTaskBackoffTimeMillis(500)
         .withRegionName(regionName)
 
@@ -169,7 +168,8 @@ private[kinesis] class KinesisReceiver[T](
       initialPosition match {
         case ts: AtTimestamp =>
           baseClientLibConfiguration.withTimestampAtInitialPositionInStream(ts.getTimestamp)
-        case _ => baseClientLibConfiguration
+        case _ =>
+          baseClientLibConfiguration.withInitialPositionInStream(initialPosition.getPosition)
       }
     }