a0269cd15fdc61c95e868068636d51b6f204e8fc
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 package org
.apache
.samza
.operators
.windows
;
22 import org
.apache
.samza
.annotation
.InterfaceStability
;
23 import org
.apache
.samza
.operators
.functions
.FoldLeftFunction
;
24 import org
.apache
.samza
.operators
.triggers
.TimeTrigger
;
25 import org
.apache
.samza
.operators
.triggers
.Trigger
;
26 import org
.apache
.samza
.operators
.triggers
.Triggers
;
27 import org
.apache
.samza
.operators
.windows
.internal
.WindowInternal
;
28 import org
.apache
.samza
.operators
.windows
.internal
.WindowType
;
30 import java
.time
.Duration
;
31 import java
.util
.ArrayList
;
32 import java
.util
.Collection
;
33 import java
.util
.function
.Function
;
34 import java
.util
.function
.Supplier
;
37 * APIs for creating different types of {@link Window}s.
39 * Groups the incoming messages in the {@link org.apache.samza.operators.MessageStream} into finite windows for processing.
41 * <p> Each window is uniquely identified by its {@link WindowKey}. A window can have one or more associated {@link Trigger}s
42 * that determine when results from the {@link Window} are emitted. Each emitted result contains one or more
43 * messages in the window and is called a {@link WindowPane}.
45 * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the window
46 * has arrived or late triggers that allow handling of late data arrivals.
49 * +--------------------------------+
50 * ------------+--------+-----------+
52 * | pane 1 |pane2 | pane3 |
53 * +-----------+--------+-----------+
55 -----------------------------------
56 *incoming message stream ------+
57 -----------------------------------
59 * +---------------------+---------+
60 * | pane 1| pane 2 | pane 3 |
62 * +---------+-----------+---------+
65 * +----------+-----------+---------+
67 * | pane 1 | pane 2 | pane 3|
69 * +----------+-----------+---------+
72 * <p> A {@link Window} can be one of the following types:
75 * Tumbling Windows: A tumbling window defines a series of non-overlapping, fixed size, contiguous intervals.
77 * Session Windows: A session window groups a {@link org.apache.samza.operators.MessageStream} into sessions.
78 * A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}.
79 * The boundary for a session is defined by a {@code sessionGap}. All messages that that arrive within
80 * the gap are grouped into the same session.
82 * Global Windows: A global window defines a single infinite window over the entire {@link org.apache.samza.operators.MessageStream}.
83 * An early trigger must be specified when defining a global window.
86 * <p> A {@link Window} is defined as "keyed" when the incoming messages are first grouped based on their key
87 * and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants of all the above window
90 * <p> Time granularity for windows: Currently, time durations are always measured in milliseconds. Time units of
91 * finer granularity are not supported.
93 @InterfaceStability.Unstable
94 public final class Windows
{
99 * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping processing
100 * time based windows based on the provided keyFn and applies the provided fold function to them.
102 * <p>The below example computes the maximum value per-key over fixed size 10 second windows.
105 * MessageStream<UserClick> stream = ...;
106 * Function<UserClick, String> keyFn = ...;
107 * BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
108 * MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
109 * Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), maxAggregator));
113 * @param keyFn the function to extract the window key from a message
114 * @param interval the duration in processing time
115 * @param initialValue the initial value to be used for aggregations
116 * @param foldFn the function to aggregate messages in the {@link WindowPane}
117 * @param <M> the type of the input message
118 * @param <WV> the type of the {@link WindowPane} output value
119 * @param <K> the type of the key in the {@link Window}
120 * @return the created {@link Window} function.
122 public static <M
, K
, WV
> Window
<M
, K
, WV
> keyedTumblingWindow(Function
<?
super M
, ?
extends K
> keyFn
, Duration interval
,
123 Supplier
<?
extends WV
> initialValue
, FoldLeftFunction
<?
super M
, WV
> foldFn
) {
125 Trigger
<M
> defaultTrigger
= new TimeTrigger
<>(interval
);
126 return new WindowInternal
<>(defaultTrigger
, (Supplier
<WV
>) initialValue
, (FoldLeftFunction
<M
, WV
>) foldFn
,
127 (Function
<M
, K
>) keyFn
, null
, WindowType
.TUMBLING
);
132 * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping
133 * processing time based windows using the provided keyFn.
135 * <p>The below example groups the stream into fixed-size 10 second windows for each key.
138 * MessageStream<UserClick> stream = ...;
139 * Function<UserClick, String> keyFn = ...;
140 * MessageStream<WindowPane<String, Collection<UserClick>>> windowedStream = stream.window(
141 * Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10)));
145 * @param keyFn function to extract key from the message
146 * @param interval the duration in processing time
147 * @param <M> the type of the input message
148 * @param <K> the type of the key in the {@link Window}
149 * @return the created {@link Window} function
151 public static <M
, K
> Window
<M
, K
, Collection
<M
>> keyedTumblingWindow(Function
<?
super M
, ?
extends K
> keyFn
, Duration interval
) {
152 FoldLeftFunction
<M
, Collection
<M
>> aggregator
= createAggregator();
154 Supplier
<Collection
<M
>> initialValue
= ArrayList
::new;
155 return keyedTumblingWindow(keyFn
, interval
, initialValue
, aggregator
);
159 * Creates a {@link Window} that windows values into fixed-size processing time based windows and aggregates
160 * them applying the provided function.
162 * <p>The below example computes the maximum value per-key over fixed size 10 second windows.
165 * MessageStream<String> stream = ...;
166 * BiFunction<String, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
167 * MessageStream<WindowPane<Void, Integer>> windowedStream = stream.window(
168 * Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator));
172 * @param duration the duration in processing time
173 * @param initialValue the initial value to be used for aggregations
174 * @param foldFn to aggregate messages in the {@link WindowPane}
175 * @param <M> the type of the input message
176 * @param <WV> the type of the {@link WindowPane} output value
177 * @return the created {@link Window} function
179 public static <M
, WV
> Window
<M
, Void
, WV
> tumblingWindow(Duration duration
, Supplier
<?
extends WV
> initialValue
,
180 FoldLeftFunction
<?
super M
, WV
> foldFn
) {
181 Trigger
<M
> defaultTrigger
= new TimeTrigger
<>(duration
);
182 return new WindowInternal
<>(defaultTrigger
, (Supplier
<WV
>) initialValue
, (FoldLeftFunction
<M
, WV
>) foldFn
,
183 null
, null
, WindowType
.TUMBLING
);
187 * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping
188 * processing time based windows.
190 * <p>The below example groups the stream into fixed-size 10 second windows and computes a windowed-percentile.
193 * MessageStream<Long> stream = ...;
194 * Function<Collection<Long, Long>> percentile99 = ..
196 * MessageStream<WindowPane<Void, Collection<Long>>> windowedStream = integerStream.window(Windows.tumblingWindow(Duration.ofSeconds(10)));
197 * MessageStream<Long> windowedPercentiles = windowed.map(windowedOutput -> percentile99(windowedOutput.getMessage());
201 * @param duration the duration in processing time
202 * @param <M> the type of the input message
203 * @return the created {@link Window} function
205 public static <M
> Window
<M
, Void
, Collection
<M
>> tumblingWindow(Duration duration
) {
206 FoldLeftFunction
<M
, Collection
<M
>> aggregator
= createAggregator();
208 Supplier
<Collection
<M
>> initialValue
= ArrayList
::new;
209 return tumblingWindow(duration
, initialValue
, aggregator
);
213 * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided {@code sessionGap}
214 * and applies the provided fold function to them.
216 * <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}.
217 * A session is considered complete when no new messages arrive within the {@code sessionGap}. All messages that arrive within
218 * the gap are grouped into the same session.
220 * <p>The below example computes the maximum value per-key over a session window of gap 10 seconds.
223 * MessageStream<UserClick> stream = ...;
224 * BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
225 * Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
226 * MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
227 * Windows.keyedSessionWindow(userIdExtractor, Duration.minute(1), maxAggregator));
231 * @param keyFn the function to extract the window key from a message
232 * @param sessionGap the timeout gap for defining the session
233 * @param initialValue the initial value to be used for aggregations
234 * @param foldFn the function to aggregate messages in the {@link WindowPane}
235 * @param <M> the type of the input message
236 * @param <K> the type of the key in the {@link Window}
237 * @param <WV> the type of the output value in the {@link WindowPane}
238 * @return the created {@link Window} function
240 public static <M
, K
, WV
> Window
<M
, K
, WV
> keyedSessionWindow(Function
<?
super M
, ?
extends K
> keyFn
, Duration sessionGap
,
241 Supplier
<?
extends WV
> initialValue
, FoldLeftFunction
<?
super M
, WV
> foldFn
) {
242 Trigger
<M
> defaultTrigger
= Triggers
.timeSinceLastMessage(sessionGap
);
243 return new WindowInternal
<>(defaultTrigger
, (Supplier
<WV
>) initialValue
, (FoldLeftFunction
<M
, WV
>) foldFn
, (Function
<M
, K
>) keyFn
,
244 null
, WindowType
.SESSION
);
248 * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided {@code sessionGap}.
250 * <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}. The
251 * boundary for the session is defined by a {@code sessionGap}. All messages that that arrive within
252 * the gap are grouped into the same session.
254 * <p>The below example groups the stream into per-key session windows of gap 10 seconds.
257 * MessageStream<UserClick> stream = ...;
258 * BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseIntField(m), c);
259 * Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
260 * MessageStream<WindowPane<String>, Collection<M>> windowedStream = stream.window(
261 * Windows.keyedSessionWindow(userIdExtractor, Duration.ofSeconds(10)));
265 * @param keyFn the function to extract the window key from a message}
266 * @param sessionGap the timeout gap for defining the session
267 * @param <M> the type of the input message
268 * @param <K> the type of the key in the {@link Window}
269 * @return the created {@link Window} function
271 public static <M
, K
> Window
<M
, K
, Collection
<M
>> keyedSessionWindow(Function
<?
super M
, ?
extends K
> keyFn
, Duration sessionGap
) {
273 FoldLeftFunction
<M
, Collection
<M
>> aggregator
= createAggregator();
275 Supplier
<Collection
<M
>> initialValue
= ArrayList
::new;
276 return keyedSessionWindow(keyFn
, sessionGap
, initialValue
, aggregator
);
280 private static <M
> FoldLeftFunction
<M
, Collection
<M
>> createAggregator() {