SAMZA-1753: Added timestamp to Incoming message envelope.
authorBoris S <boryas@apache.org>
Mon, 25 Jun 2018 20:08:50 +0000 (13:08 -0700)
committerBoris S <bshkolnik@linkedin.com>
Mon, 25 Jun 2018 20:08:50 +0000 (13:08 -0700)
Author: Boris S <boryas@apache.org>
Author: Boris Shkolnik <bshkolni@linkedin.com>

Reviewers: xinyuiscool@apache.org

Closes #559 from sborya/kafkaTS

samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala

index 60a605b..4d0ce2f 100644 (file)
@@ -36,6 +36,7 @@ public class IncomingMessageEnvelope {
   private final Object key;
   private final Object message;
   private final int size;
+  private long timestamp = 0L;
 
   /**
    * Constructs a new IncomingMessageEnvelope from specified components.
@@ -66,6 +67,14 @@ public class IncomingMessageEnvelope {
     this.size = size;
   }
 
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
   public SystemStreamPartition getSystemStreamPartition() {
     return systemStreamPartition;
   }
index 3a1ffe9..4cebb82 100644 (file)
@@ -279,9 +279,13 @@ private[kafka] class KafkaSystemConsumer(
       }
 
       if(fetchLimitByBytesEnabled ) {
-        put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, getMessageSize(msg.message)))
+        val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, getMessageSize(msg.message))
+        ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L)
+        put(systemStreamPartition, ime)
       } else {
-        put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message))
+        val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message)
+        ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L)
+        put(systemStreamPartition, ime)
       }
 
       setIsAtHead(systemStreamPartition, isAtHead)