SAMZA-1249: Fix equality for WindowKey for Non-keyed tumbling windows
[samza.git] / samza-core / src / test / java / org / apache / samza / operators / TestWindowOperator.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package org.apache.samza.operators;
21
22
23 import com.google.common.collect.ImmutableList;
24 import com.google.common.collect.ImmutableSet;
25 import junit.framework.Assert;
26 import org.apache.samza.Partition;
27 import org.apache.samza.application.StreamApplication;
28 import org.apache.samza.config.Config;
29 import org.apache.samza.metrics.MetricsRegistryMap;
30 import org.apache.samza.operators.triggers.FiringType;
31 import org.apache.samza.operators.triggers.Trigger;
32 import org.apache.samza.operators.triggers.Triggers;
33 import org.apache.samza.operators.windows.AccumulationMode;
34 import org.apache.samza.operators.windows.WindowPane;
35 import org.apache.samza.operators.windows.Windows;
36 import org.apache.samza.runtime.ApplicationRunner;
37 import org.apache.samza.system.IncomingMessageEnvelope;
38 import org.apache.samza.system.OutgoingMessageEnvelope;
39 import org.apache.samza.system.StreamSpec;
40 import org.apache.samza.system.SystemStream;
41 import org.apache.samza.system.SystemStreamPartition;
42 import org.apache.samza.task.MessageCollector;
43 import org.apache.samza.task.StreamOperatorTask;
44 import org.apache.samza.task.TaskContext;
45 import org.apache.samza.task.TaskCoordinator;
46 import org.apache.samza.testUtils.TestClock;
47 import org.junit.Before;
48 import org.junit.Test;
49
50 import java.time.Duration;
51 import java.util.ArrayList;
52 import java.util.Collection;
53 import java.util.List;
54 import java.util.function.Function;
55
56 import static org.mockito.Mockito.mock;
57 import static org.mockito.Mockito.when;
58
59 public class TestWindowOperator {
60 private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
61 private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3);
62 private Config config;
63 private TaskContext taskContext;
64 private ApplicationRunner runner;
65
66 @Before
67 public void setup() throws Exception {
68 config = mock(Config.class);
69 taskContext = mock(TaskContext.class);
70 runner = mock(ApplicationRunner.class);
71 when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
72 .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
73 when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
74 when(runner.getStreamSpec("integers")).thenReturn(new StreamSpec("integers", "integers", "kafka"));
75 }
76
77 @Test
78 public void testTumblingWindowsDiscardingMode() throws Exception {
79
80 StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING,
81 Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
82 List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
83
84 TestClock testClock = new TestClock();
85 StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
86 task.init(config, taskContext);
87 MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
88 integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
89 testClock.advanceTime(Duration.ofSeconds(1));
90
91 task.window(messageCollector, taskCoordinator);
92 Assert.assertEquals(windowPanes.size(), 5);
93 Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
94 Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
95
96 Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
97 Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
98
99 Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
100 Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
101
102 Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
103 Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);
104
105 Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3));
106 Assert.assertEquals((windowPanes.get(4).getMessage()).size(), 1);
107 }
108
109 @Test
110 public void testNonKeyedTumblingWindowsDiscardingMode() throws Exception {
111
112 StreamApplication sgb = new TumblingWindowStreamApplication(AccumulationMode.DISCARDING,
113 Duration.ofSeconds(1), Triggers.repeat(Triggers.count(1000)));
114 List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
115
116 TestClock testClock = new TestClock();
117 StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
118 task.init(config, taskContext);
119
120 MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
121 Assert.assertEquals(windowPanes.size(), 0);
122
123 integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
124 Assert.assertEquals(windowPanes.size(), 0);
125
126 testClock.advanceTime(Duration.ofSeconds(1));
127 Assert.assertEquals(windowPanes.size(), 0);
128
129 task.window(messageCollector, taskCoordinator);
130 Assert.assertEquals(windowPanes.size(), 1);
131 Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 9);
132 }
133
134
135 @Test
136 public void testTumblingWindowsAccumulatingMode() throws Exception {
137 StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
138 Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
139 List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
140 TestClock testClock = new TestClock();
141 StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
142 task.init(config, taskContext);
143
144 MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
145 integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
146 testClock.advanceTime(Duration.ofSeconds(1));
147 task.window(messageCollector, taskCoordinator);
148
149 Assert.assertEquals(windowPanes.size(), 7);
150 Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
151 Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
152
153 Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
154 Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
155
156 Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
157 Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 4);
158
159 Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
160 Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 4);
161 }
162
163 @Test
164 public void testSessionWindowsDiscardingMode() throws Exception {
165 StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
166 TestClock testClock = new TestClock();
167 List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
168 StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
169 task.init(config, taskContext);
170 MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
171 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
172 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
173 testClock.advanceTime(Duration.ofSeconds(1));
174 task.window(messageCollector, taskCoordinator);
175
176 Assert.assertEquals(windowPanes.size(), 1);
177 Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
178 Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
179
180 task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
181 task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
182 task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
183 task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
184
185 testClock.advanceTime(Duration.ofSeconds(1));
186 task.window(messageCollector, taskCoordinator);
187
188 Assert.assertEquals(windowPanes.size(), 3);
189 Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
190 Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001");
191 Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001");
192 Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
193 Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
194 Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
195
196 task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
197 task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
198
199 testClock.advanceTime(Duration.ofSeconds(1));
200 task.window(messageCollector, taskCoordinator);
201 Assert.assertEquals(windowPanes.size(), 4);
202 Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
203 Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001");
204 Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);
205
206 }
207
208 @Test
209 public void testSessionWindowsAccumulatingMode() throws Exception {
210 StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING,
211 Duration.ofMillis(500));
212 TestClock testClock = new TestClock();
213 StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
214 List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
215
216 MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
217 task.init(config, taskContext);
218
219 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
220 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
221 testClock.advanceTime(Duration.ofSeconds(1));
222
223 task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
224 task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
225
226 task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
227 task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
228
229 testClock.advanceTime(Duration.ofSeconds(1));
230 task.window(messageCollector, taskCoordinator);
231 Assert.assertEquals(windowPanes.size(), 2);
232 Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
233 Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
234 Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
235 Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
236 Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 4);
237 }
238
239 @Test
240 public void testCancellationOfOnceTrigger() throws Exception {
241 StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
242 Duration.ofSeconds(1), Triggers.count(2));
243 TestClock testClock = new TestClock();
244 StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
245 task.init(config, taskContext);
246
247 List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
248 MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
249 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
250 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
251 Assert.assertEquals(windowPanes.size(), 1);
252 Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
253 Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
254 Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY);
255
256 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
257 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
258 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
259
260 Assert.assertEquals(windowPanes.size(), 1);
261
262 testClock.advanceTime(Duration.ofSeconds(1));
263 task.window(messageCollector, taskCoordinator);
264
265 Assert.assertEquals(windowPanes.size(), 2);
266 Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
267 Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
268 Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
269
270 task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
271 testClock.advanceTime(Duration.ofSeconds(1));
272 task.window(messageCollector, taskCoordinator);
273
274 Assert.assertEquals(windowPanes.size(), 3);
275 Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3));
276 Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
277 Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.DEFAULT);
278 Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 1);
279
280 }
281
282 @Test
283 public void testCancellationOfAnyTrigger() throws Exception {
284 StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
285 Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))));
286 TestClock testClock = new TestClock();
287 StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
288 task.init(config, taskContext);
289
290 List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
291 MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
292 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
293 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
294 //assert that the count trigger fired
295 Assert.assertEquals(windowPanes.size(), 1);
296
297 //advance the timer to enable the triggering of the inner timeSinceFirstMessage trigger
298 testClock.advanceTime(Duration.ofMillis(500));
299
300 //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
301 Assert.assertEquals(windowPanes.size(), 1);
302
303 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
304 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
305 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
306
307 //advance timer by 500 more millis to enable the default trigger
308 testClock.advanceTime(Duration.ofMillis(500));
309 task.window(messageCollector, taskCoordinator);
310
311 //assert that the default trigger fired
312 Assert.assertEquals(windowPanes.size(), 2);
313 Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
314 Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1));
315 Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
316 Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 5);
317
318 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
319
320 //advance timer by 500 millis to enable the inner timeSinceFirstMessage trigger
321 testClock.advanceTime(Duration.ofMillis(500));
322 task.window(messageCollector, taskCoordinator);
323
324 Assert.assertEquals(windowPanes.size(), 3);
325 Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.EARLY);
326 Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
327 Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
328
329 //advance timer by > 500 millis to enable the default trigger
330 testClock.advanceTime(Duration.ofMillis(900));
331 task.window(messageCollector, taskCoordinator);
332 Assert.assertEquals(windowPanes.size(), 4);
333 Assert.assertEquals(windowPanes.get(3).getFiringType(), FiringType.DEFAULT);
334 Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1));
335 Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000");
336 }
337
338 @Test
339 public void testCancelationOfRepeatingNestedTriggers() throws Exception {
340
341 StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
342 Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))));
343 List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
344
345 MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
346
347 TestClock testClock = new TestClock();
348 StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
349 task.init(config, taskContext);
350
351 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
352
353 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
354 //assert that the count trigger fired
355 Assert.assertEquals(windowPanes.size(), 1);
356
357 //advance the timer to enable the potential triggering of the inner timeSinceFirstMessage trigger
358 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
359 testClock.advanceTime(Duration.ofMillis(500));
360 //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
361 task.window(messageCollector, taskCoordinator);
362 Assert.assertEquals(windowPanes.size(), 2);
363
364 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
365 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
366 Assert.assertEquals(windowPanes.size(), 3);
367
368 task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
369 //advance timer by 500 more millis to enable the default trigger
370 testClock.advanceTime(Duration.ofMillis(500));
371 task.window(messageCollector, taskCoordinator);
372 //assert that the default trigger fired
373 Assert.assertEquals(windowPanes.size(), 4);
374 }
375
376 private class KeyedTumblingWindowStreamApplication implements StreamApplication {
377
378 private final AccumulationMode mode;
379 private final Duration duration;
380 private final Trigger<IntegerEnvelope> earlyTrigger;
381 private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
382
383 KeyedTumblingWindowStreamApplication(AccumulationMode mode,
384 Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) {
385 this.mode = mode;
386 this.duration = timeDuration;
387 this.earlyTrigger = earlyTrigger;
388 }
389
390 @Override
391 public void init(StreamGraph graph, Config config) {
392 MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers",
393 (k, m) -> new IntegerEnvelope((Integer) k));
394 Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
395 inStream
396 .map(m -> m)
397 .window(Windows.keyedTumblingWindow(keyFn, duration).setEarlyTrigger(earlyTrigger)
398 .setAccumulationMode(mode))
399 .sink((message, messageCollector, taskCoordinator) -> {
400 messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
401 });
402 }
403 }
404
405 private class TumblingWindowStreamApplication implements StreamApplication {
406
407 private final AccumulationMode mode;
408 private final Duration duration;
409 private final Trigger<IntegerEnvelope> earlyTrigger;
410 private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
411
412 TumblingWindowStreamApplication(AccumulationMode mode,
413 Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) {
414 this.mode = mode;
415 this.duration = timeDuration;
416 this.earlyTrigger = earlyTrigger;
417 }
418
419 @Override
420 public void init(StreamGraph graph, Config config) {
421 MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers",
422 (k, m) -> new IntegerEnvelope((Integer) k));
423 Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
424 inStream
425 .map(m -> m)
426 .window(Windows.<IntegerEnvelope>tumblingWindow(duration).setEarlyTrigger(earlyTrigger)
427 .setAccumulationMode(mode))
428 .sink((message, messageCollector, taskCoordinator) -> {
429 messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
430 });
431 }
432 }
433
434 private class KeyedSessionWindowStreamApplication implements StreamApplication {
435
436 private final AccumulationMode mode;
437 private final Duration duration;
438 private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
439
440 KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) {
441 this.mode = mode;
442 this.duration = duration;
443 }
444
445 @Override
446 public void init(StreamGraph graph, Config config) {
447 MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers",
448 (k, m) -> new IntegerEnvelope((Integer) k));
449 Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
450
451 inStream
452 .map(m -> m)
453 .window(Windows.keyedSessionWindow(keyFn, duration)
454 .setAccumulationMode(mode))
455 .sink((message, messageCollector, taskCoordinator) -> {
456 messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
457 });
458 }
459 }
460
461 private class IntegerEnvelope extends IncomingMessageEnvelope {
462
463 IntegerEnvelope(Integer key) {
464 super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, key);
465 }
466 }
467 }