SAMZA-1312: Add Control Messages and Intermediate Stream Serde
authorXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Tue, 13 Jun 2017 16:46:02 +0000 (09:46 -0700)
committerXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Tue, 13 Jun 2017 16:46:02 +0000 (09:46 -0700)
commit3375c7116ef68d845b4f96cf5d1b9291397d79ec
tree3aa405989023d34d9e1dd3d5e6984ef8a302138a
parentae29a40d0f6a46a6a3af71f85ca025b0932c95c8
SAMZA-1312: Add Control Messages and Intermediate Stream Serde

In this patch, we add the control message types which includes:
* EndOfStreamMessage
* WatermarkMessage

To support in-band data and control messages, we provide a wrapper serde (IntermediateMessageSerde) to serialize/deserialize data/control messages based on message type byte (first byte in the intermediate stream message). The format of the message is defined in SAMZA-1312. The patch integrates this serde with SerdeManager.

Tested in example jobs deployed locally and works as expected.

Author: Xinyu Liu <xiliu@xiliu-ld.linkedin.biz>

Reviewers: Jagadish V <jagadish@apache.org>

Closes #207 from xinyuiscool/SAMZA-1312
13 files changed:
samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
samza-core/src/main/java/org/apache/samza/execution/JobNode.java
samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
samza-core/src/main/java/org/apache/samza/message/ControlMessage.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/message/EndOfStreamMessage.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/message/MessageType.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/message/WatermarkMessage.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java [new file with mode: 0644]
samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java [new file with mode: 0644]
samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala