SAMZA-1249: Fix equality for WindowKey for Non-keyed tumbling windows
authorvjagadish1989 <jvenkatr@linkedin.com>
Mon, 1 May 2017 20:57:25 +0000 (13:57 -0700)
committerJacob Maes <jmaes@linkedin.com>
Mon, 1 May 2017 20:57:25 +0000 (13:57 -0700)
- Fix a `ClassCastException` and an NPE when using Tumbling window without keys
- Fix equality and hashCode for `WindowKey`
- Refactor the `TestWindowOperator` unit tests using simpler types and a mock `MessageCollector`.

More details in `SAMZA-1249`

Author: vjagadish1989 <jvenkatr@linkedin.com>

Reviewers: Prateek Maheshwari <pmaheshw@linkedin.com>, Jacob Maes <jmaes@linkedin.com>

Closes #149 from vjagadish1989/samza-1249

samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java

index bf52724..a1e7774 100644 (file)
@@ -65,21 +65,15 @@ public class WindowKey<K> {
 
     WindowKey<?> windowKey = (WindowKey<?>) o;
 
-    if (!key.equals(windowKey.key)) return false;
-
-    if (paneId == null) {
-      return windowKey.paneId == null;
-    }
-
-    return paneId.equals(windowKey.paneId);
+    if (key != null ? !key.equals(windowKey.key) : windowKey.key != null) return false;
+    return !(paneId != null ? !paneId.equals(windowKey.paneId) : windowKey.paneId != null);
 
   }
 
   @Override
   public int hashCode() {
-    int result = key.hashCode();
+    int result = key != null ? key.hashCode() : 0;
     result = 31 * result + (paneId != null ? paneId.hashCode() : 0);
     return result;
   }
-
 }
index 721b4c0..a0269cd 100644 (file)
@@ -178,7 +178,7 @@ public final class Windows {
    */
   public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration duration, Supplier<? extends WV> initialValue,
                                                            FoldLeftFunction<? super M, WV> foldFn) {
-    Trigger<M> defaultTrigger = Triggers.repeat(new TimeTrigger<>(duration));
+    Trigger<M> defaultTrigger = new TimeTrigger<>(duration);
     return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) foldFn,
         null, null, WindowType.TUMBLING);
   }
index 597244e..ca8a151 100644 (file)
@@ -35,7 +35,9 @@ import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamOperatorTask;
@@ -55,9 +57,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestWindowOperator {
-  private final MessageCollector messageCollector = mock(MessageCollector.class);
   private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
-  private final List<WindowPane<Integer, Collection<MessageEnvelope<Integer, Integer>>>> windowPanes = new ArrayList<>();
   private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3);
   private Config config;
   private TaskContext taskContext;
@@ -65,15 +65,13 @@ public class TestWindowOperator {
 
   @Before
   public void setup() throws Exception {
-    windowPanes.clear();
-
     config = mock(Config.class);
     taskContext = mock(TaskContext.class);
     runner = mock(ApplicationRunner.class);
     when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
         .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
     when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-    when(runner.getStreamSpec("integer-stream")).thenReturn(new StreamSpec("integer-stream", "integers", "kafka"));
+    when(runner.getStreamSpec("integers")).thenReturn(new StreamSpec("integers", "integers", "kafka"));
   }
 
   @Test
@@ -81,11 +79,13 @@ public class TestWindowOperator {
 
     StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING,
         Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+
     TestClock testClock = new TestClock();
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
     task.init(config, taskContext);
-
-    integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator));
+    MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+    integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
     testClock.advanceTime(Duration.ofSeconds(1));
 
     task.window(messageCollector, taskCoordinator);
@@ -107,14 +107,42 @@ public class TestWindowOperator {
   }
 
   @Test
+  public void testNonKeyedTumblingWindowsDiscardingMode() throws Exception {
+
+    StreamApplication sgb = new TumblingWindowStreamApplication(AccumulationMode.DISCARDING,
+        Duration.ofSeconds(1), Triggers.repeat(Triggers.count(1000)));
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+    Assert.assertEquals(windowPanes.size(), 0);
+
+    integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
+    Assert.assertEquals(windowPanes.size(), 0);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    Assert.assertEquals(windowPanes.size(), 0);
+
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 1);
+    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 9);
+  }
+
+
+  @Test
   public void testTumblingWindowsAccumulatingMode() throws Exception {
     StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
         Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
     TestClock testClock = new TestClock();
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
     task.init(config, taskContext);
 
-    integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator));
+    MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+    integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
     testClock.advanceTime(Duration.ofSeconds(1));
     task.window(messageCollector, taskCoordinator);
 
@@ -136,11 +164,12 @@ public class TestWindowOperator {
   public void testSessionWindowsDiscardingMode() throws Exception {
     StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
     TestClock testClock = new TestClock();
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
     task.init(config, taskContext);
-
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     testClock.advanceTime(Duration.ofSeconds(1));
     task.window(messageCollector, taskCoordinator);
 
@@ -148,10 +177,10 @@ public class TestWindowOperator {
     Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
     Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
 
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
 
     testClock.advanceTime(Duration.ofSeconds(1));
     task.window(messageCollector, taskCoordinator);
@@ -164,8 +193,8 @@ public class TestWindowOperator {
     Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
     Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
 
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
 
     testClock.advanceTime(Duration.ofSeconds(1));
     task.window(messageCollector, taskCoordinator);
@@ -182,17 +211,20 @@ public class TestWindowOperator {
         Duration.ofMillis(500));
     TestClock testClock = new TestClock();
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+
+    MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
     task.init(config, taskContext);
 
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     testClock.advanceTime(Duration.ofSeconds(1));
 
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
 
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
 
     testClock.advanceTime(Duration.ofSeconds(1));
     task.window(messageCollector, taskCoordinator);
@@ -212,16 +244,18 @@ public class TestWindowOperator {
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
     task.init(config, taskContext);
 
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+    MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     Assert.assertEquals(windowPanes.size(), 1);
     Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
     Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
     Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY);
 
-    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
 
     Assert.assertEquals(windowPanes.size(), 1);
 
@@ -233,7 +267,7 @@ public class TestWindowOperator {
     Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
     Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
 
-    task.process(new IntegerMessageEnvelope(3, 6), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
     testClock.advanceTime(Duration.ofSeconds(1));
     task.window(messageCollector, taskCoordinator);
 
@@ -253,8 +287,10 @@ public class TestWindowOperator {
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
     task.init(config, taskContext);
 
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+    MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     //assert that the count trigger fired
     Assert.assertEquals(windowPanes.size(), 1);
 
@@ -264,9 +300,9 @@ public class TestWindowOperator {
     //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
     Assert.assertEquals(windowPanes.size(), 1);
 
-    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
 
     //advance timer by 500 more millis to enable the default trigger
     testClock.advanceTime(Duration.ofMillis(500));
@@ -279,7 +315,7 @@ public class TestWindowOperator {
     Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
     Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 5);
 
-    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
 
     //advance timer by 500 millis to enable the inner timeSinceFirstMessage trigger
     testClock.advanceTime(Duration.ofMillis(500));
@@ -304,28 +340,32 @@ public class TestWindowOperator {
 
     StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
         Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))));
+    List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+
+    MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+
     TestClock testClock = new TestClock();
     StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
     task.init(config, taskContext);
 
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
 
-    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     //assert that the count trigger fired
     Assert.assertEquals(windowPanes.size(), 1);
 
     //advance the timer to enable the potential triggering of the inner timeSinceFirstMessage trigger
-    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     testClock.advanceTime(Duration.ofMillis(500));
     //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
     task.window(messageCollector, taskCoordinator);
     Assert.assertEquals(windowPanes.size(), 2);
 
-    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     Assert.assertEquals(windowPanes.size(), 3);
 
-    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+    task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     //advance timer by 500 more millis to enable the default trigger
     testClock.advanceTime(Duration.ofMillis(500));
     task.window(messageCollector, taskCoordinator);
@@ -337,10 +377,11 @@ public class TestWindowOperator {
 
     private final AccumulationMode mode;
     private final Duration duration;
-    private final Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger;
+    private final Trigger<IntegerEnvelope> earlyTrigger;
+    private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
 
     KeyedTumblingWindowStreamApplication(AccumulationMode mode,
-        Duration timeDuration, Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger) {
+        Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) {
       this.mode = mode;
       this.duration = timeDuration;
       this.earlyTrigger = earlyTrigger;
@@ -348,68 +389,79 @@ public class TestWindowOperator {
 
     @Override
     public void init(StreamGraph graph, Config config) {
-      MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.getInputStream("integer-stream",
-          (k, m) -> new MessageEnvelope(k, m));
-      Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey();
+      MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers",
+          (k, m) -> new IntegerEnvelope((Integer) k));
+      Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
       inStream
         .map(m -> m)
         .window(Windows.keyedTumblingWindow(keyFn, duration).setEarlyTrigger(earlyTrigger)
           .setAccumulationMode(mode))
-        .map(m -> {
-            windowPanes.add(m);
-            return m;
-          });
+          .sink((message, messageCollector, taskCoordinator) -> {
+              messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
+            });
     }
   }
 
-  private class KeyedSessionWindowStreamApplication implements StreamApplication {
+  private class TumblingWindowStreamApplication implements StreamApplication {
 
     private final AccumulationMode mode;
     private final Duration duration;
+    private final Trigger<IntegerEnvelope> earlyTrigger;
+    private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
 
-    KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) {
+    TumblingWindowStreamApplication(AccumulationMode mode,
+                                         Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) {
       this.mode = mode;
-      this.duration = duration;
+      this.duration = timeDuration;
+      this.earlyTrigger = earlyTrigger;
     }
 
     @Override
     public void init(StreamGraph graph, Config config) {
-      MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.getInputStream("integer-stream",
-          (k, m) -> new MessageEnvelope(k, m));
-      Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey();
-
+      MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers",
+          (k, m) -> new IntegerEnvelope((Integer) k));
+      Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
       inStream
           .map(m -> m)
-          .window(Windows.keyedSessionWindow(keyFn, duration)
+          .window(Windows.<IntegerEnvelope>tumblingWindow(duration).setEarlyTrigger(earlyTrigger)
               .setAccumulationMode(mode))
-          .map(m -> {
-              windowPanes.add(m);
-              return m;
+          .sink((message, messageCollector, taskCoordinator) -> {
+              messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
             });
     }
   }
 
-  private class IntegerMessageEnvelope extends IncomingMessageEnvelope {
-    IntegerMessageEnvelope(int key, int msg) {
-      super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, msg);
-    }
-  }
+  private class KeyedSessionWindowStreamApplication implements StreamApplication {
 
-  private class MessageEnvelope<K, V> {
-    private final K key;
-    private final V value;
+    private final AccumulationMode mode;
+    private final Duration duration;
+    private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
 
-    MessageEnvelope(K key, V value) {
-      this.key = key;
-      this.value = value;
+    KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) {
+      this.mode = mode;
+      this.duration = duration;
     }
 
-    public K getKey() {
-      return key;
+    @Override
+    public void init(StreamGraph graph, Config config) {
+      MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers",
+          (k, m) -> new IntegerEnvelope((Integer) k));
+      Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
+
+      inStream
+          .map(m -> m)
+          .window(Windows.keyedSessionWindow(keyFn, duration)
+              .setAccumulationMode(mode))
+          .sink((message, messageCollector, taskCoordinator) -> {
+              messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
+            });
     }
+  }
+
+  private class IntegerEnvelope extends IncomingMessageEnvelope  {
 
-    public V getValue() {
-      return value;
+    IntegerEnvelope(Integer key) {
+      super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, key);
     }
   }
 }