a0269cd15fdc61c95e868068636d51b6f204e8fc
[samza.git] / samza-api / src / main / java / org / apache / samza / operators / windows / Windows.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package org.apache.samza.operators.windows;
21
22 import org.apache.samza.annotation.InterfaceStability;
23 import org.apache.samza.operators.functions.FoldLeftFunction;
24 import org.apache.samza.operators.triggers.TimeTrigger;
25 import org.apache.samza.operators.triggers.Trigger;
26 import org.apache.samza.operators.triggers.Triggers;
27 import org.apache.samza.operators.windows.internal.WindowInternal;
28 import org.apache.samza.operators.windows.internal.WindowType;
29
30 import java.time.Duration;
31 import java.util.ArrayList;
32 import java.util.Collection;
33 import java.util.function.Function;
34 import java.util.function.Supplier;
35
36 /**
37 * APIs for creating different types of {@link Window}s.
38 *
39 * Groups the incoming messages in the {@link org.apache.samza.operators.MessageStream} into finite windows for processing.
40 *
41 * <p> Each window is uniquely identified by its {@link WindowKey}. A window can have one or more associated {@link Trigger}s
42 * that determine when results from the {@link Window} are emitted. Each emitted result contains one or more
43 * messages in the window and is called a {@link WindowPane}.
44 *
45 * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the window
46 * has arrived or late triggers that allow handling of late data arrivals.
47 *
48 * window wk1
49 * +--------------------------------+
50 * ------------+--------+-----------+
51 * | | | |
52 * | pane 1 |pane2 | pane3 |
53 * +-----------+--------+-----------+
54 *
55 -----------------------------------
56 *incoming message stream ------+
57 -----------------------------------
58 * window wk2
59 * +---------------------+---------+
60 * | pane 1| pane 2 | pane 3 |
61 * | | | |
62 * +---------+-----------+---------+
63 *
64 * window wk3
65 * +----------+-----------+---------+
66 * | | | |
67 * | pane 1 | pane 2 | pane 3|
68 * | | | |
69 * +----------+-----------+---------+
70 *
71 *
72 * <p> A {@link Window} can be one of the following types:
73 * <ul>
74 * <li>
75 * Tumbling Windows: A tumbling window defines a series of non-overlapping, fixed size, contiguous intervals.
76 * <li>
77 * Session Windows: A session window groups a {@link org.apache.samza.operators.MessageStream} into sessions.
78 * A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}.
79 * The boundary for a session is defined by a {@code sessionGap}. All messages that that arrive within
80 * the gap are grouped into the same session.
81 * <li>
82 * Global Windows: A global window defines a single infinite window over the entire {@link org.apache.samza.operators.MessageStream}.
83 * An early trigger must be specified when defining a global window.
84 * </ul>
85 *
86 * <p> A {@link Window} is defined as "keyed" when the incoming messages are first grouped based on their key
87 * and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants of all the above window
88 * types.
89 *
90 * <p> Time granularity for windows: Currently, time durations are always measured in milliseconds. Time units of
91 * finer granularity are not supported.
92 */
93 @InterfaceStability.Unstable
94 public final class Windows {
95
96 private Windows() { }
97
98 /**
99 * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping processing
100 * time based windows based on the provided keyFn and applies the provided fold function to them.
101 *
102 * <p>The below example computes the maximum value per-key over fixed size 10 second windows.
103 *
104 * <pre> {@code
105 * MessageStream<UserClick> stream = ...;
106 * Function<UserClick, String> keyFn = ...;
107 * BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
108 * MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
109 * Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), maxAggregator));
110 * }
111 * </pre>
112 *
113 * @param keyFn the function to extract the window key from a message
114 * @param interval the duration in processing time
115 * @param initialValue the initial value to be used for aggregations
116 * @param foldFn the function to aggregate messages in the {@link WindowPane}
117 * @param <M> the type of the input message
118 * @param <WV> the type of the {@link WindowPane} output value
119 * @param <K> the type of the key in the {@link Window}
120 * @return the created {@link Window} function.
121 */
122 public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(Function<? super M, ? extends K> keyFn, Duration interval,
123 Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> foldFn) {
124
125 Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
126 return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) foldFn,
127 (Function<M, K>) keyFn, null, WindowType.TUMBLING);
128 }
129
130
131 /**
132 * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping
133 * processing time based windows using the provided keyFn.
134 *
135 * <p>The below example groups the stream into fixed-size 10 second windows for each key.
136 *
137 * <pre> {@code
138 * MessageStream<UserClick> stream = ...;
139 * Function<UserClick, String> keyFn = ...;
140 * MessageStream<WindowPane<String, Collection<UserClick>>> windowedStream = stream.window(
141 * Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10)));
142 * }
143 * </pre>
144 *
145 * @param keyFn function to extract key from the message
146 * @param interval the duration in processing time
147 * @param <M> the type of the input message
148 * @param <K> the type of the key in the {@link Window}
149 * @return the created {@link Window} function
150 */
151 public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(Function<? super M, ? extends K> keyFn, Duration interval) {
152 FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();
153
154 Supplier<Collection<M>> initialValue = ArrayList::new;
155 return keyedTumblingWindow(keyFn, interval, initialValue, aggregator);
156 }
157
158 /**
159 * Creates a {@link Window} that windows values into fixed-size processing time based windows and aggregates
160 * them applying the provided function.
161 *
162 * <p>The below example computes the maximum value per-key over fixed size 10 second windows.
163 *
164 * <pre> {@code
165 * MessageStream<String> stream = ...;
166 * BiFunction<String, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
167 * MessageStream<WindowPane<Void, Integer>> windowedStream = stream.window(
168 * Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator));
169 * }
170 * </pre>
171 *
172 * @param duration the duration in processing time
173 * @param initialValue the initial value to be used for aggregations
174 * @param foldFn to aggregate messages in the {@link WindowPane}
175 * @param <M> the type of the input message
176 * @param <WV> the type of the {@link WindowPane} output value
177 * @return the created {@link Window} function
178 */
179 public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration duration, Supplier<? extends WV> initialValue,
180 FoldLeftFunction<? super M, WV> foldFn) {
181 Trigger<M> defaultTrigger = new TimeTrigger<>(duration);
182 return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) foldFn,
183 null, null, WindowType.TUMBLING);
184 }
185
186 /**
187 * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping
188 * processing time based windows.
189 *
190 * <p>The below example groups the stream into fixed-size 10 second windows and computes a windowed-percentile.
191 *
192 * <pre> {@code
193 * MessageStream<Long> stream = ...;
194 * Function<Collection<Long, Long>> percentile99 = ..
195 *
196 * MessageStream<WindowPane<Void, Collection<Long>>> windowedStream = integerStream.window(Windows.tumblingWindow(Duration.ofSeconds(10)));
197 * MessageStream<Long> windowedPercentiles = windowed.map(windowedOutput -> percentile99(windowedOutput.getMessage());
198 * }
199 * </pre>
200 *
201 * @param duration the duration in processing time
202 * @param <M> the type of the input message
203 * @return the created {@link Window} function
204 */
205 public static <M> Window<M, Void, Collection<M>> tumblingWindow(Duration duration) {
206 FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();
207
208 Supplier<Collection<M>> initialValue = ArrayList::new;
209 return tumblingWindow(duration, initialValue, aggregator);
210 }
211
212 /**
213 * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided {@code sessionGap}
214 * and applies the provided fold function to them.
215 *
216 * <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}.
217 * A session is considered complete when no new messages arrive within the {@code sessionGap}. All messages that arrive within
218 * the gap are grouped into the same session.
219 *
220 * <p>The below example computes the maximum value per-key over a session window of gap 10 seconds.
221 *
222 * <pre> {@code
223 * MessageStream<UserClick> stream = ...;
224 * BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
225 * Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
226 * MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
227 * Windows.keyedSessionWindow(userIdExtractor, Duration.minute(1), maxAggregator));
228 * }
229 * </pre>
230 *
231 * @param keyFn the function to extract the window key from a message
232 * @param sessionGap the timeout gap for defining the session
233 * @param initialValue the initial value to be used for aggregations
234 * @param foldFn the function to aggregate messages in the {@link WindowPane}
235 * @param <M> the type of the input message
236 * @param <K> the type of the key in the {@link Window}
237 * @param <WV> the type of the output value in the {@link WindowPane}
238 * @return the created {@link Window} function
239 */
240 public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<? super M, ? extends K> keyFn, Duration sessionGap,
241 Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> foldFn) {
242 Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap);
243 return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) foldFn, (Function<M, K>) keyFn,
244 null, WindowType.SESSION);
245 }
246
247 /**
248 * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided {@code sessionGap}.
249 *
250 * <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}. The
251 * boundary for the session is defined by a {@code sessionGap}. All messages that that arrive within
252 * the gap are grouped into the same session.
253 *
254 * <p>The below example groups the stream into per-key session windows of gap 10 seconds.
255 *
256 * <pre> {@code
257 * MessageStream<UserClick> stream = ...;
258 * BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseIntField(m), c);
259 * Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
260 * MessageStream<WindowPane<String>, Collection<M>> windowedStream = stream.window(
261 * Windows.keyedSessionWindow(userIdExtractor, Duration.ofSeconds(10)));
262 * }
263 * </pre>
264 *
265 * @param keyFn the function to extract the window key from a message}
266 * @param sessionGap the timeout gap for defining the session
267 * @param <M> the type of the input message
268 * @param <K> the type of the key in the {@link Window}
269 * @return the created {@link Window} function
270 */
271 public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(Function<? super M, ? extends K> keyFn, Duration sessionGap) {
272
273 FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();
274
275 Supplier<Collection<M>> initialValue = ArrayList::new;
276 return keyedSessionWindow(keyFn, sessionGap, initialValue, aggregator);
277 }
278
279
280 private static <M> FoldLeftFunction<M, Collection<M>> createAggregator() {
281 return (m, c) -> {
282 c.add(m);
283 return c;
284 };
285 }
286
287 }