SAMZA-1321: Propagate end-of-stream and watermark messages
authorXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Thu, 29 Jun 2017 00:16:10 +0000 (17:16 -0700)
committerXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Thu, 29 Jun 2017 00:16:10 +0000 (17:16 -0700)
commitbb3007d69ae177791aeb00ba5b17fd937628b2b8
treeea44b4d2584dba1b0ca07989e5b56b8e8547c4cc
parent6815a4d44da8e9785ae5e9b27ab1e0aeb7dc43a8
SAMZA-1321: Propagate end-of-stream and watermark messages

The patch completes the end-of-stream work flow across multi-stage pipeline. It also contains initial commit for supporting watermarks. For watermark, there are issues raised in the review feedback and will be addressed by further prs. The main logic this patch adds:

- EndOfStreamManager aggregates the end-of-stream control messages, propagate the result to to downstream intermediate topics based on the topology of the IO in the StreamGraph.

- WatermarkManager aggregates the watermark control messages from the upstage tasks, pass it through the operators, and propagate it to downstream.

In operator impl, I implemented similar watermark logic as Beam for watermark propagation:
* InputWatermark(op) = min { OutputWatermark(op') | op1 is upstream of op}
* OutputWatermark(op) = min { InputWatermark(op), OldestWorkTime(op) }

Add quite a few unit tests and integration test. The code is 100% covered as reported by Intellij. Both control messages work as expected.

Author: Xinyu Liu <xiliu@xiliu-ld.linkedin.biz>

Reviewers: Yi Pan <nickpan47@gmail.com>

Closes #236 from xinyuiscool/SAMZA-1321
44 files changed:
samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
samza-core/src/main/java/org/apache/samza/control/ControlMessageListenerTask.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/control/ControlMessageUtils.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/control/EndOfStreamManager.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/control/IOGraph.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/control/Watermark.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/control/WatermarkManager.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/message/IntermediateMessageType.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
samza-core/src/test/java/org/apache/samza/control/TestControlMessageUtils.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java [new file with mode: 0644]
samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessorUtil.java [new file with mode: 0644]
samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java [new file with mode: 0644]
samza-test/src/test/java/org/apache/samza/test/controlmessages/TestData.java [new file with mode: 0644]
samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java [new file with mode: 0644]
samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java [new file with mode: 0644]
samza-test/src/test/java/org/apache/samza/test/util/ArraySystemFactory.java [new file with mode: 0644]
samza-test/src/test/java/org/apache/samza/test/util/Base64Serializer.java [new file with mode: 0644]
samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java [new file with mode: 0644]
samza-test/src/test/java/org/apache/samza/test/util/TestStreamConsumer.java [new file with mode: 0644]