SAMZA-1870: hdfs offset comparator to handle end of stream offset
authorHai Lu <halu@linkedin.com>
Tue, 11 Sep 2018 16:57:59 +0000 (09:57 -0700)
committerxiliu <xiliu@linkedin.com>
Tue, 11 Sep 2018 16:57:59 +0000 (09:57 -0700)
This happens particularly when using HDFS as a bootstrap stream:

org.apache.samza.SamzaException: Invalid offset for MultiFileHdfsReader: END_OF_STREAM
at org.apache.samza.system.hdfs.reader.MultiFileHdfsReader.getCurFileIndex(MultiFileHdfsReader.java:64)
at org.apache.samza.system.hdfs.HdfsSystemAdmin.offsetComparator(HdfsSystemAdmin.java:224)
at org.apache.samza.system.chooser.BootstrappingChooser.org$apache$samza$system$chooser$BootstrappingChooser$$checkOffset(BootstrappingChooser.scala:274)
at org.apache.samza.system.chooser.BootstrappingChooser.choose(BootstrappingChooser.scala:204)
at org.apache.samza.system.chooser.DefaultChooser.choose(DefaultChooser.scala:294)
at org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:210)
at org.apache.samza.task.AsyncRunLoop.chooseEnvelope(AsyncRunLoop.java:208)
at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:156)
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:787)
at org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:101)
at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:148)

Author: Hai Lu <halu@linkedin.com>

Reviewers: Xinyu Liu <xinyu@apache.org>

Closes #633 from lhaiesp/master

samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java

index 28a1bac..0d50f26 100644 (file)
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
+import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
@@ -221,6 +222,17 @@ public class HdfsSystemAdmin implements SystemAdmin {
     if (StringUtils.isBlank(offset1) || StringUtils.isBlank(offset2)) {
       return null;
     }
+    /*
+     * Properly handle END_OF_STREAM offset here. If both are END_OF_STREAM,
+     * then they are equal. Otherwise END_OF_STREAM is always greater than any
+     * other offsets.
+     */
+    if (offset1.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
+      return offset2.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET) ? 0 : 1;
+    }
+    if (offset2.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
+      return -1;
+    }
     int fileIndex1 = MultiFileHdfsReader.getCurFileIndex(offset1);
     int fileIndex2 = MultiFileHdfsReader.getCurFileIndex(offset2);
     if (fileIndex1 == fileIndex2) {