SAMZA-1268: Javadoc cleanup for public APIs for 0.13 release
[samza.git] / samza-api / src / main / java / org / apache / samza / operators / windows / Windows.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package org.apache.samza.operators.windows;
21
22 import org.apache.samza.annotation.InterfaceStability;
23 import org.apache.samza.operators.functions.FoldLeftFunction;
24 import org.apache.samza.operators.triggers.TimeTrigger;
25 import org.apache.samza.operators.triggers.Trigger;
26 import org.apache.samza.operators.triggers.Triggers;
27 import org.apache.samza.operators.windows.internal.WindowInternal;
28 import org.apache.samza.operators.windows.internal.WindowType;
29
30 import java.time.Duration;
31 import java.util.ArrayList;
32 import java.util.Collection;
33 import java.util.function.Function;
34 import java.util.function.Supplier;
35
36 /**
37 * APIs for creating different types of {@link Window}s.
38 *
39 * Groups incoming messages in a {@link org.apache.samza.operators.MessageStream} into finite windows for processing.
40 *
41 * <p> Each window is uniquely identified by its {@link WindowKey}. A window can have one or more associated
42 * {@link Trigger}s that determine when results from the {@link Window} are emitted. Each emitted result contains one
43 * or more messages in the window and is called a {@link WindowPane}.
44 *
45 * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data
46 * for the window has arrived, or late triggers that allow handling late arrivals of data.
47 *
48 * window wk1
49 * +--------------------------------+
50 * ------------+--------+-----------+
51 * | | | |
52 * | pane 1 |pane2 | pane3 |
53 * +-----------+--------+-----------+
54 *
55 * -----------------------------------
56 * incoming message stream ------+
57 * -----------------------------------
58 * window wk2
59 * +---------------------+---------+
60 * | pane 1| pane 2 | pane 3 |
61 * | | | |
62 * +---------+-----------+---------+
63 *
64 * window wk3
65 * +----------+-----------+---------+
66 * | | | |
67 * | pane 1 | pane 2 | pane 3|
68 * | | | |
69 * +----------+-----------+---------+
70 *
71 *
72 * <p> A {@link Window} can be one of the following types:
73 * <ul>
74 * <li>
75 * Tumbling Window: A tumbling window defines a series of non-overlapping, fixed size, contiguous intervals.
76 * <li>
77 * Session Window: A session window groups a {@link org.apache.samza.operators.MessageStream} into sessions.
78 * A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}.
79 * The boundary for a session is defined by a {@code sessionGap}. All messages that that arrive within
80 * the gap are grouped into the same session.
81 * </ul>
82 *
83 * <p> A {@link Window} is said to be "keyed" when the incoming messages are first grouped based on their key
84 * and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants
85 * of the window types above.
86 *
87 * <p> The value for the window can be updated incrementally by providing an {@code initialValue} {@link Supplier}
88 * and an aggregating {@link FoldLeftFunction}. The initial value supplier is invoked every time a new window is
89 * created. The aggregating function is invoked for each incoming message for the window. If these are not provided,
90 * the emitted {@link WindowPane} will contain a collection of messages in the window.
91 *
92 * <p> Time granularity for windows: Currently, time durations are always measured in milliseconds. Time units of
93 * finer granularity are not supported.
94 */
95 @InterfaceStability.Unstable
96 public final class Windows {
97
98 private Windows() { }
99
100 /**
101 * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping processing
102 * time based windows based on the provided keyFn and applies the provided fold function to them.
103 *
104 * <p>The below example computes the maximum value per-key over fixed size 10 second windows.
105 *
106 * <pre> {@code
107 * MessageStream<UserClick> stream = ...;
108 * Function<UserClick, String> keyFn = ...;
109 * Supplier<Integer> initialValue = () -> 0;
110 * FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
111 * MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
112 * Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), maxAggregator));
113 * }
114 * </pre>
115 *
116 * @param keyFn the function to extract the window key from a message
117 * @param interval the duration in processing time
118 * @param initialValue the initial value supplier for the aggregator. Invoked when a new window is created.
119 * @param aggregator the function to incrementally update the window value. Invoked when a new message
120 * arrives for the window.
121 * @param <M> the type of the input message
122 * @param <WV> the type of the {@link WindowPane} output value
123 * @param <K> the type of the key in the {@link Window}
124 * @return the created {@link Window} function.
125 */
126 public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(
127 Function<? super M, ? extends K> keyFn, Duration interval,
128 Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator) {
129
130 Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
131 return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
132 (Function<M, K>) keyFn, null, WindowType.TUMBLING);
133 }
134
135
136 /**
137 * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping
138 * processing time based windows using the provided keyFn.
139 *
140 * <p>The below example groups the stream into fixed-size 10 second windows for each key.
141 *
142 * <pre> {@code
143 * MessageStream<UserClick> stream = ...;
144 * Function<UserClick, String> keyFn = ...;
145 * MessageStream<WindowPane<String, Collection<UserClick>>> windowedStream = stream.window(
146 * Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10)));
147 * }
148 * </pre>
149 *
150 * @param keyFn function to extract key from the message
151 * @param interval the duration in processing time
152 * @param <M> the type of the input message
153 * @param <K> the type of the key in the {@link Window}
154 * @return the created {@link Window} function
155 */
156 public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(
157 Function<? super M, ? extends K> keyFn, Duration interval) {
158 FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();
159
160 Supplier<Collection<M>> initialValue = ArrayList::new;
161 return keyedTumblingWindow(keyFn, interval, initialValue, aggregator);
162 }
163
164 /**
165 * Creates a {@link Window} that windows values into fixed-size processing time based windows and aggregates
166 * them applying the provided function.
167 *
168 * <p>The below example computes the maximum value per-key over fixed size 10 second windows.
169 *
170 * <pre> {@code
171 * MessageStream<String> stream = ...;
172 * Supplier<Integer> initialValue = () -> 0;
173 * FoldLeftFunction<String, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
174 * MessageStream<WindowPane<Void, Integer>> windowedStream = stream.window(
175 * Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator));
176 * }
177 * </pre>
178 *
179 * @param interval the duration in processing time
180 * @param initialValue the initial value supplier for the aggregator. Invoked when a new window is created.
181 * @param aggregator the function to incrementally update the window value. Invoked when a new message
182 * arrives for the window.
183 * @param <M> the type of the input message
184 * @param <WV> the type of the {@link WindowPane} output value
185 * @return the created {@link Window} function
186 */
187 public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration interval, Supplier<? extends WV> initialValue,
188 FoldLeftFunction<? super M, WV> aggregator) {
189 Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
190 return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
191 null, null, WindowType.TUMBLING);
192 }
193
194 /**
195 * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping
196 * processing time based windows.
197 *
198 * <p>The below example groups the stream into fixed-size 10 second windows and computes a windowed-percentile.
199 *
200 * <pre> {@code
201 * MessageStream<Long> stream = ...;
202 * Function<Collection<Long>, Long> percentile99 = ..
203 *
204 * MessageStream<WindowPane<Void, Collection<Long>>> windowedStream =
205 * integerStream.window(Windows.tumblingWindow(Duration.ofSeconds(10)));
206 * MessageStream<Long> windowedPercentiles =
207 * windowedStream.map(windowPane -> percentile99(windowPane.getMessage());
208 * }
209 * </pre>
210 *
211 * @param duration the duration in processing time
212 * @param <M> the type of the input message
213 * @return the created {@link Window} function
214 */
215 public static <M> Window<M, Void, Collection<M>> tumblingWindow(Duration duration) {
216 FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();
217
218 Supplier<Collection<M>> initialValue = ArrayList::new;
219 return tumblingWindow(duration, initialValue, aggregator);
220 }
221
222 /**
223 * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided
224 * {@code sessionGap} and applies the provided fold function to them.
225 *
226 * <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}.
227 * A session is considered complete when no new messages arrive within the {@code sessionGap}. All messages
228 * that arrive within the gap are grouped into the same session.
229 *
230 * <p>The below example computes the maximum value per-key over a session window of gap 10 seconds.
231 *
232 * <pre> {@code
233 * MessageStream<UserClick> stream = ...;
234 * Supplier<Integer> initialValue = () -> 0;
235 * FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
236 * Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
237 * MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
238 * Windows.keyedSessionWindow(userIdExtractor, Duration.minute(1), maxAggregator));
239 * }
240 * </pre>
241 *
242 * @param keyFn the function to extract the window key from a message
243 * @param sessionGap the timeout gap for defining the session
244 * @param initialValue the initial value supplier for the aggregator. Invoked when a new window is created.
245 * @param aggregator the function to incrementally update the window value. Invoked when a new message
246 * arrives for the window.
247 * @param <M> the type of the input message
248 * @param <K> the type of the key in the {@link Window}
249 * @param <WV> the type of the output value in the {@link WindowPane}
250 * @return the created {@link Window} function
251 */
252 public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(
253 Function<? super M, ? extends K> keyFn, Duration sessionGap,
254 Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator) {
255 Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap);
256 return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
257 (Function<M, K>) keyFn, null, WindowType.SESSION);
258 }
259
260 /**
261 * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided
262 * {@code sessionGap}.
263 *
264 * <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}. The
265 * boundary for the session is defined by a {@code sessionGap}. All messages that that arrive within
266 * the gap are grouped into the same session.
267 *
268 * <p>The below example groups the stream into per-key session windows of gap 10 seconds.
269 *
270 * <pre> {@code
271 * MessageStream<UserClick> stream = ...;
272 * Supplier<Integer> initialValue = () -> 0;
273 * FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseIntField(m), c);
274 * Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
275 * MessageStream<WindowPane<String>, Collection<M>> windowedStream = stream.window(
276 * Windows.keyedSessionWindow(userIdExtractor, Duration.ofSeconds(10)));
277 * }
278 * </pre>
279 *
280 * @param keyFn the function to extract the window key from a message}
281 * @param sessionGap the timeout gap for defining the session
282 * @param <M> the type of the input message
283 * @param <K> the type of the key in the {@link Window}
284 * @return the created {@link Window} function
285 */
286 public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(
287 Function<? super M, ? extends K> keyFn, Duration sessionGap) {
288
289 FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();
290
291 Supplier<Collection<M>> initialValue = ArrayList::new;
292 return keyedSessionWindow(keyFn, sessionGap, initialValue, aggregator);
293 }
294
295
296 private static <M> FoldLeftFunction<M, Collection<M>> createAggregator() {
297 return (m, c) -> {
298 c.add(m);
299 return c;
300 };
301 }
302
303 }