SAMZA-1268: Javadoc cleanup for public APIs for 0.13 release
authorPrateek Maheshwari <pmaheshw@linkedin.com>
Tue, 9 May 2017 02:24:03 +0000 (19:24 -0700)
committerJacob Maes <jmaes@linkedin.com>
Tue, 9 May 2017 02:24:03 +0000 (19:24 -0700)
This PR cleans up javadocs for Fluent API classes in the samza-api module.
Also updates the TaskContext (existing) and ContextManager (new) interfaces to add support for setting an pass-through user-defined context.

Author: Prateek Maheshwari <pmaheshw@linkedin.com>

Reviewers: Jacob Maes <jmaes@linkedin.com>

Closes #169 from prateekm/api-docs-cleanup

19 files changed:
checkstyle/checkstyle-suppressions.xml
samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
samza-api/src/main/java/org/apache/samza/operators/ContextManager.java
samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java
samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
samza-api/src/main/java/org/apache/samza/task/TaskContext.java
samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java

index a88b341..d227929 100644 (file)
@@ -30,5 +30,7 @@
   <suppress checks="ConstantName"
             files="ApplicationStatus.java"
             lines="26-29"/>
+  <suppress checks="UnusedImports"
+            files="StreamApplication.java"/>
 </suppressions>
 
index a26c5af..0d77295 100644 (file)
@@ -20,54 +20,69 @@ package org.apache.samza.application;
 
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.ContextManager;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
-
+import org.apache.samza.operators.functions.InitableFunction;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
 
 /**
- * This interface defines a template for stream application that user will implement to initialize operator DAG in {@link StreamGraph}.
- *
+ * Describes and initializes the transforms for processing message streams and generating results.
  * <p>
- * User program implements {@link StreamApplication#init(StreamGraph, Config)} method to initialize the transformation logic
- * from all input streams to output streams. A simple user code example is shown below:
- * </p>
- *
+ * The following example removes page views older than 1 hour from the input stream:
  * <pre>{@code
- * public class PageViewCounterExample implements StreamApplication {
- *   // max timeout is 60 seconds
- *   private static final MAX_TIMEOUT = 60000;
- *
+ * public class PageViewCounter implements StreamApplication {
  *   public void init(StreamGraph graph, Config config) {
- *     MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream("pageViewEventStream", (k, m) -> (PageViewEvent) m);
- *     OutputStream<String, PageViewEvent, PageViewEvent> pageViewEventFilteredStream = graph
- *       .getOutputStream("pageViewEventFiltered", m -> m.memberId, m -> m);
+ *     MessageStream<PageViewEvent> pageViewEvents =
+ *       graph.getInputStream("pageViewEvents", (k, m) -> (PageViewEvent) m);
+ *     OutputStream<String, PageViewEvent, PageViewEvent> recentPageViewEvents =
+ *       graph.getOutputStream("recentPageViewEvents", m -> m.memberId, m -> m);
  *
  *     pageViewEvents
- *       .filter(m -> !(m.getMessage().getEventTime() < System.currentTimeMillis() - MAX_TIMEOUT))
- *       .sendTo(pageViewEventFilteredStream);
+ *       .filter(m -> m.getCreationTime() > System.currentTimeMillis() - Duration.ofHours(1).toMillis())
+ *       .sendTo(filteredPageViewEvents);
  *   }
- *
- *   // local execution mode
+ * }
+ * }</pre>
+ *<p>
+ * The example above can be run using an ApplicationRunner:
+ * <pre>{@code
  *   public static void main(String[] args) {
  *     CommandLine cmdLine = new CommandLine();
  *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- *     PageViewCounterExample userApp = new PageViewCounterExample();
- *     ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
- *     localRunner.run(userApp);
+ *     PageViewCounter app = new PageViewCounter();
+ *     LocalApplicationRunner runner = new LocalApplicationRunner(config);
+ *     runner.run(app);
+ *     runner.waitForFinish();
  *   }
- *
- * }
  * }</pre>
- *
+ * <p>
+ * Implementation Notes: Currently StreamApplications are wrapped in a {@link StreamTask} during execution.
+ * A new StreamApplication instance will be created and initialized when planning the execution, as well as for each
+ * {@link StreamTask} instance used for processing incoming messages. Execution is synchronous and thread-safe
+ * within each {@link StreamTask}.
  */
 @InterfaceStability.Unstable
 public interface StreamApplication {
 
   /**
-   * Users are required to implement this abstract method to initialize the processing logic of the application, in terms
-   * of a DAG of {@link org.apache.samza.operators.MessageStream}s and operators
+   * Describes and initializes the transforms for processing message streams and generating results.
+   * <p>
+   * The {@link StreamGraph} provides access to input and output streams. Input {@link MessageStream}s can be
+   * transformed into other {@link MessageStream}s or sent to an {@link OutputStream} using the {@link MessageStream}
+   * operators.
+   * <p>
+   * Most operators accept custom functions for doing the transformations. These functions are {@link InitableFunction}s
+   * and are provided the {@link Config} and {@link TaskContext} during their own initialization. The config and the
+   * context can be used, for example, to create custom metrics or access durable state stores.
+   * <p>
+   * A shared context between {@link InitableFunction}s for different operators within a task instance can be set
+   * up by providing a {@link ContextManager} using {@link StreamGraph#withContextManager}.
    *
-   * @param graph  an empty {@link StreamGraph} object to be initialized
-   * @param config  the {@link Config} of the application
+   * @param graph the {@link StreamGraph} to get input/output streams from
+   * @param config the configuration for the application
    */
   void init(StreamGraph graph, Config config);
 
index c3b1cf3..5f2c020 100644 (file)
@@ -24,24 +24,26 @@ import org.apache.samza.task.TaskContext;
 
 
 /**
- * Interface class defining methods to initialize and finalize the context used by the transformation functions.
+ * Manages custom context that is shared across multiple operator functions in a task.
  */
 @InterfaceStability.Unstable
 public interface ContextManager {
+
   /**
-   * The initialization method to create shared context for the whole task in Samza. Default to NO-OP
+   * Allows initializing and setting a custom context that is shared across multiple operator functions in a task.
+   * <p>
+   * This method is invoked before any {@link org.apache.samza.operators.functions.InitableFunction}s are initialized.
+   * Use {@link TaskContext#setUserContext(Object)} to set the context here and {@link TaskContext#getUserContext()} to
+   * get it in InitableFunctions.
    *
-   * @param config  the configuration object for the task
-   * @param context  the {@link TaskContext} object
-   * @return  User-defined task-wide context object
+   * @param config the {@link Config} for the application
+   * @param context the {@link TaskContext} for this task
    */
-  default TaskContext initTaskContext(Config config, TaskContext context) {
-    return context;
-  }
+  void init(Config config, TaskContext context);
 
   /**
-   * The finalize method to allow users to close resource initialized in {@link #initTaskContext} method. Default to NO-OP.
-   *
+   * Allows closing the custom context that is shared across multiple operator functions in a task.
    */
-  default void finalizeTaskContext() { }
+  void close();
+
 }
index 91ef44c..7e7a537 100644 (file)
@@ -33,11 +33,11 @@ import java.util.function.Function;
 
 
 /**
- * Represents a stream of messages.
+ * A stream of messages that can be transformed into another {@link MessageStream}.
  * <p>
- * A {@link MessageStream} can be transformed into another {@link MessageStream} by applying the transforms in this API.
+ * A {@link MessageStream} corresponding to an input stream can be obtained using {@link StreamGraph#getInputStream}.
  *
- * @param <M>  type of messages in this stream
+ * @param <M> the type of messages in this stream
  */
 @InterfaceStability.Unstable
 public interface MessageStream<M> {
@@ -47,46 +47,49 @@ public interface MessageStream<M> {
    * transformed {@link MessageStream}.
    *
    * @param mapFn the function to transform a message to another message
-   * @param <TM> the type of messages in the transformed {@link MessageStream}
+   * @param <OM> the type of messages in the transformed {@link MessageStream}
    * @return the transformed {@link MessageStream}
    */
-  <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFn);
+  <OM> MessageStream<OM> map(MapFunction<? super M, ? extends OM> mapFn);
 
   /**
    * Applies the provided 1:n function to transform a message in this {@link MessageStream}
    * to n messages in the transformed {@link MessageStream}
    *
    * @param flatMapFn the function to transform a message to zero or more messages
-   * @param <TM> the type of messages in the transformed {@link MessageStream}
+   * @param <OM> the type of messages in the transformed {@link MessageStream}
    * @return the transformed {@link MessageStream}
    */
-  <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn);
+  <OM> MessageStream<OM> flatMap(FlatMapFunction<? super M, ? extends OM> flatMapFn);
 
   /**
    * Applies the provided function to messages in this {@link MessageStream} and returns the
-   * transformed {@link MessageStream}.
+   * filtered {@link MessageStream}.
    * <p>
    * The {@link Function} is a predicate which determines whether a message in this {@link MessageStream}
-   * should be retained in the transformed {@link MessageStream}.
+   * should be retained in the filtered {@link MessageStream}.
    *
-   * @param filterFn the predicate to filter messages from this {@link MessageStream}
+   * @param filterFn the predicate to filter messages from this {@link MessageStream}.
    * @return the transformed {@link MessageStream}
    */
   MessageStream<M> filter(FilterFunction<? super M> filterFn);
 
   /**
    * Allows sending messages in this {@link MessageStream} to an output system using the provided {@link SinkFunction}.
-   *
-   * NOTE: If the output is for a {@link org.apache.samza.system.SystemStream}, use
-   * {@link #sendTo(OutputStream)} instead. This transform should only be used to output to
-   * non-stream systems (e.g., an external database).
+   * <p>
+   * Offers more control over processing and sending messages than {@link #sendTo(OutputStream)} since
+   * the {@link SinkFunction} has access to the {@link org.apache.samza.task.MessageCollector} and
+   * {@link org.apache.samza.task.TaskCoordinator}.
+   * <p>
+   * This can also be used to send output to a system (e.g. a database) that doesn't have a corresponding
+   * Samza SystemProducer implementation.
    *
    * @param sinkFn the function to send messages in this stream to an external system
    */
   void sink(SinkFunction<? super M> sinkFn);
 
   /**
-   * Allows sending messages in this {@link MessageStream} to an output {@link MessageStream}.
+   * Allows sending messages in this {@link MessageStream} to an {@link OutputStream}.
    *
    * @param outputStream the output stream to send messages to
    * @param <K> the type of key in the outgoing message
@@ -100,6 +103,8 @@ public interface MessageStream<M> {
    * {@link WindowPane}s.
    * <p>
    * Use the {@link org.apache.samza.operators.windows.Windows} helper methods to create the appropriate windows.
+   * <p>
+   * <b>Note:</b> As of version 0.13.0, messages in windows are kept in memory and may be lost in case of failures.
    *
    * @param window the window to group and process messages from this {@link MessageStream}
    * @param <K> the type of key in the message in this {@link MessageStream}. If a key is specified,
@@ -110,23 +115,27 @@ public interface MessageStream<M> {
   <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window);
 
   /**
-   * Joins this {@link MessageStream} with another {@link MessageStream} using the provided pairwise {@link JoinFunction}.
+   * Joins this {@link MessageStream} with another {@link MessageStream} using the provided
+   * pairwise {@link JoinFunction}.
    * <p>
-   * Messages in each stream are retained (currently, in memory) for the provided {@code ttl} and join results are
+   * Messages in each stream are retained for the provided {@code ttl} and join results are
    * emitted as matches are found.
+   * <p>
+   * <b>Note:</b> As of version 0.13.0, messages in joins are kept in memory and may be lost in case of failures.
    *
    * @param otherStream the other {@link MessageStream} to be joined with
    * @param joinFn the function to join messages from this and the other {@link MessageStream}
    * @param ttl the ttl for messages in each stream
    * @param <K> the type of join key
-   * @param <OM> the type of messages in the other stream
-   * @param <TM> the type of messages resulting from the {@code joinFn}
+   * @param <JM> the type of messages in the other stream
+   * @param <OM> the type of messages resulting from the {@code joinFn}
    * @return the joined {@link MessageStream}
    */
-  <K, OM, TM> MessageStream<TM> join(MessageStream<OM> otherStream, JoinFunction<? extends K, ? super M, ? super OM, ? extends TM> joinFn, Duration ttl);
+  <K, JM, OM> MessageStream<OM> join(MessageStream<JM> otherStream,
+      JoinFunction<? extends K, ? super M, ? super JM, ? extends OM> joinFn, Duration ttl);
 
   /**
-   * Merge all {@code otherStreams} with this {@link MessageStream}.
+   * Merges all {@code otherStreams} with this {@link MessageStream}.
    *
    * @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream}
    * @return the merged {@link MessageStream}
@@ -136,6 +145,9 @@ public interface MessageStream<M> {
   /**
    * Sends the messages of type {@code M}in this {@link MessageStream} to a repartitioned output stream and consumes
    * them as an input {@link MessageStream} again. Uses keys returned by the {@code keyExtractor} as the partition key.
+   * <p>
+   * <b>Note</b>: Repartitioned streams are created automatically in the default system. The key and message Serdes
+   * configured for the default system must be able to serialize and deserialize types K and M respectively.
    *
    * @param keyExtractor the {@link Function} to extract the output message key and partition key from
    *                     the input message
index d299068..ea6721b 100644 (file)
@@ -24,16 +24,17 @@ import java.util.function.BiFunction;
 import java.util.function.Function;
 
 /**
- * Provides APIs for accessing {@link MessageStream}s to be used to create the DAG of transforms.
+ * Provides access to {@link MessageStream}s and {@link OutputStream}s used to describe the processing logic.
  */
 @InterfaceStability.Unstable
 public interface StreamGraph {
 
   /**
-   * Gets the input {@link MessageStream} corresponding to the logical {@code streamId}. Multiple invocations of
-   * this method with the same {@code streamId} will throw an {@link IllegalStateException}
+   * Gets the input {@link MessageStream} corresponding to the {@code streamId}.
+   * <p>
+   * Multiple invocations of this method with the same {@code streamId} will throw an {@link IllegalStateException}.
    *
-   * @param streamId the unique logical ID for the stream
+   * @param streamId the unique ID for the stream
    * @param msgBuilder the {@link BiFunction} to convert the incoming key and message to a message
    *                   in the input {@link MessageStream}
    * @param <K> the type of key in the incoming message
@@ -45,10 +46,11 @@ public interface StreamGraph {
   <K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<? super K, ? super V, ? extends M> msgBuilder);
 
   /**
-   * Gets the {@link OutputStream} corresponding to the logical {@code streamId}. Multiple invocations of
-   * this method with the same {@code streamId} will throw an {@link IllegalStateException}
+   * Gets the {@link OutputStream} corresponding to the {@code streamId}.
+   * <p>
+   * Multiple invocations of this method with the same {@code streamId} will throw an {@link IllegalStateException}.
    *
-   * @param streamId the unique logical ID for the stream
+   * @param streamId the unique ID for the stream
    * @param keyExtractor the {@link Function} to extract the outgoing key from the output message
    * @param msgExtractor the {@link Function} to extract the outgoing message from the output message
    * @param <K> the type of key in the outgoing message
@@ -62,12 +64,12 @@ public interface StreamGraph {
 
   /**
    * Sets the {@link ContextManager} for this {@link StreamGraph}.
-   *
-   * The provided {@code contextManager} will be initialized before the transformation functions
-   * and can be used to setup shared context between them.
+   * <p>
+   * The provided {@link ContextManager} can be used to setup shared context between the operator functions
+   * within a task instance
    *
    * @param contextManager the {@link ContextManager} to use for the {@link StreamGraph}
-   * @return the {@link StreamGraph} with the {@code contextManager} as its {@link ContextManager}
+   * @return the {@link StreamGraph} with {@code contextManager} set as its {@link ContextManager}
    */
   StreamGraph withContextManager(ContextManager contextManager);
 
index 143bae0..cd49d1b 100644 (file)
@@ -22,7 +22,8 @@ import org.apache.samza.annotation.InterfaceStability;
 
 
 /**
- * A function that specifies whether a message should be retained for further processing or filtered out.
+ * Specifies whether a message should be retained for further processing.
+ *
  * @param <M>  type of the input message
  */
 @InterfaceStability.Unstable
@@ -31,7 +32,8 @@ public interface FilterFunction<M> extends InitableFunction {
 
   /**
    * Returns a boolean indicating whether this message should be retained or filtered out.
-   * @param message  the input message to be checked. This object should not be mutated.
+   *
+   * @param message  the input message to be checked
    * @return  true if {@code message} should be retained
    */
   boolean apply(M message);
index bbbddeb..e6c4958 100644 (file)
@@ -24,8 +24,8 @@ import java.util.Collection;
 
 
 /**
- * A function that transforms an input message into a collection of 0 or more messages,
- * possibly of a different type.
+ * Transforms an input message into a collection of 0 or more messages, possibly of a different type.
+ *
  * @param <M>  type of the input message
  * @param <OM>  type of the transformed messages
  */
@@ -35,6 +35,7 @@ public interface FlatMapFunction<M, OM>  extends InitableFunction {
 
   /**
    * Transforms the provided message into a collection of 0 or more messages.
+   *
    * @param message  the input message to be transformed
    * @return  a collection of 0 or more transformed messages
    */
index 58e88fd..25728fc 100644 (file)
 package org.apache.samza.operators.functions;
 
 /**
- * A fold function that incrementally combines and aggregates values for a window.
+ * Incrementally updates the window value as messages are added to the window.
  */
 public interface FoldLeftFunction<M, WV> extends InitableFunction {
 
   /**
-   * Incrementally combine and aggregate values for the window. Guaranteed to be invoked for every
-   * message added to the window.
+   * Incrementally updates the window value as messages are added to the window.
    *
-   * @param message the incoming message that is added to the window. This object should not be mutated.
-   * @param oldValue the previous value
+   * @param message the message being added to the window
+   * @param oldValue the previous value associated with the window
    * @return the new value
    */
   WV apply(M message, WV oldValue);
index 2f738da..4f9fad7 100644 (file)
@@ -24,16 +24,16 @@ import org.apache.samza.task.TaskContext;
 
 
 /**
- * interface defined to initalize the context of message transformation functions
+ * A function that can be initialized before execution.
  */
 @InterfaceStability.Unstable
 public interface InitableFunction {
 
   /**
-   * Interface method to initialize the context for a specific message transformation function.
+   * Initializes the function before any messages are processed.
    *
-   * @param config  the {@link Config} object for this task
-   * @param context  the {@link TaskContext} object for this task
+   * @param config the {@link Config} for the application
+   * @param context the {@link TaskContext} for this task
    */
   default void init(Config config, TaskContext context) { }
 
index fc38177..f30a47d 100644 (file)
@@ -22,8 +22,8 @@ import org.apache.samza.annotation.InterfaceStability;
 
 
 /**
- * A function that joins messages from two {@link org.apache.samza.operators.MessageStream}s and produces
- * a joined message.
+ * Joins incoming messages in two streams by key.
+ *
  * @param <K>  type of the join key
  * @param <M>  type of the input message
  * @param <JM>  type of the message to join with
@@ -33,7 +33,8 @@ import org.apache.samza.annotation.InterfaceStability;
 public interface JoinFunction<K, M, JM, RM>  extends InitableFunction {
 
   /**
-   * Join the provided input messages and produces the joined messages.
+   * Joins the provided messages and returns the joined message.
+   *
    * @param message  the input message
    * @param otherMessage  the message to join with
    * @return  the joined message
@@ -41,17 +42,17 @@ public interface JoinFunction<K, M, JM, RM>  extends InitableFunction {
   RM apply(M message, JM otherMessage);
 
   /**
-   * Method to get the join key in the messages from the first input stream
+   * Get the join key for messages in the first input stream.
    *
-   * @param message  the input message from the first input stream
+   * @param message  the message in the first input stream
    * @return  the join key
    */
   K getFirstKey(M message);
 
   /**
-   * Method to get the join key in the messages from the second input stream
+   * Get the join key for messages in the second input stream.
    *
-   * @param message  the input message from the second input stream
+   * @param message  the message in the second input stream
    * @return  the join key
    */
   K getSecondKey(JM message);
index b09fb99..240039f 100644 (file)
@@ -22,7 +22,8 @@ import org.apache.samza.annotation.InterfaceStability;
 
 
 /**
- * A function that transforms an input message into another message, possibly of a different type.
+ * Transforms an input message into another message, possibly of a different type.
+ *
  * @param <M>  type of the input message
  * @param <OM>  type of the transformed message
  */
@@ -33,7 +34,7 @@ public interface MapFunction<M, OM>  extends InitableFunction {
   /**
    * Transforms the provided message into another message.
    *
-   * @param message  the input message to be transformed. This object should not be mutated.
+   * @param message  the input message to be transformed
    * @return  the transformed message
    */
   OM apply(M message);
index 1d140ee..83aa0a1 100644 (file)
@@ -24,7 +24,8 @@ import org.apache.samza.task.TaskCoordinator;
 
 
 /**
- * A function that allows sending a message to an output system.
+ * Allows sending a message to an output system.
+ *
  * @param <M>  type of the input message
  */
 @InterfaceStability.Unstable
index 9609292..321cc26 100644 (file)
@@ -22,21 +22,20 @@ import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.triggers.Trigger;
 
 /**
- * Groups incoming messages in the {@link org.apache.samza.operators.MessageStream} into finite
- * windows for processing.
+ * Groups incoming messages in the {@link org.apache.samza.operators.MessageStream} into finite windows for processing.
  *
- * <p> A window is uniquely identified by its {@link WindowKey}. A window can have one or more associated {@link Trigger}s
- * that determine when results from the {@link Window} are emitted.
+ * <p> A window is uniquely identified by its {@link WindowKey}. A window can have one or more associated
+ * {@link Trigger}s that determine when results from the {@link Window} are emitted.
  *
  * <p> Each emitted result contains one or more messages in the window and is called a {@link WindowPane}.
  * A pane can include all messages collected for the window so far or only the new messages
  * since the last emitted pane. (as determined by the {@link AccumulationMode})
  *
- * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the window
- * has arrived or late triggers that allow handling of late data arrivals.
+ * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the
+ * window has arrived, or late triggers that allow handling late arrivals of data.
  *
- * <p> A {@link Window} is defined as "keyed" when the incoming {@link org.apache.samza.operators.MessageStream} is first
- * partitioned based on the provided key, and windowing is applied on the partitioned stream.
+ * <p> A {@link Window} is said to be as "keyed" when the incoming {@link org.apache.samza.operators.MessageStream}
+ * is first grouped based on the provided key, and windowing is applied on the grouped stream.
  *
  *                                     window wk1 (with its triggers)
  *                                      +--------------------------------+
@@ -45,9 +44,9 @@ import org.apache.samza.operators.triggers.Trigger;
  *                                      | pane 1    |pane2   |   pane3   |
  *                                      +-----------+--------+-----------+
  *
- -----------------------------------
- *incoming message stream ------+
- -----------------------------------
-----------------------------------
+ *     incoming message stream ------+
-----------------------------------
  *                                      window wk2
  *                                      +---------------------+---------+
  *                                      |   pane 1|   pane 2  |  pane 3 |
@@ -62,19 +61,19 @@ import org.apache.samza.operators.triggers.Trigger;
  *                                      +----------+-----------+---------+
  *
  *
- * <p> Use the {@link Windows} APIs to create various windows and the {@link org.apache.samza.operators.triggers.Triggers}
- * APIs to create triggers.
+ * <p> Use {@link Windows} to create various windows and {@link org.apache.samza.operators.triggers.Triggers}
+ * to create their triggers.
  *
  * @param <M> the type of the input message
- * @param <K> the type of the key in the message in this {@link org.apache.samza.operators.MessageStream}.
- * @param <WV> the type of the value in the {@link WindowPane}.
+ * @param <K> the type of the key in the message
+ * @param <WV> the type of the value in the window
  */
 @InterfaceStability.Unstable
 public interface Window<M, K, WV> {
 
   /**
    * Set the early triggers for this {@link Window}.
-   * <p>Use the {@link org.apache.samza.operators.triggers.Triggers} APIs to create instances of {@link Trigger}
+   * <p> Use the {@link org.apache.samza.operators.triggers.Triggers} APIs to create instances of {@link Trigger}
    *
    * @param trigger the early trigger
    * @return the {@link Window} function with the early trigger
@@ -83,7 +82,7 @@ public interface Window<M, K, WV> {
 
   /**
    * Set the late triggers for this {@link Window}.
-   * <p>Use the {@link org.apache.samza.operators.triggers.Triggers} APIs to create instances of {@link Trigger}
+   * <p> Use the {@link org.apache.samza.operators.triggers.Triggers} APIs to create instances of {@link Trigger}
    *
    * @param trigger the late trigger
    * @return the {@link Window} function with the late trigger
@@ -92,17 +91,16 @@ public interface Window<M, K, WV> {
 
   /**
    * Specify how a {@link Window} should process its previously emitted {@link WindowPane}s.
-   *
    * <p> There are two types of {@link AccumulationMode}s:
    * <ul>
-   *  <li> ACCUMULATING: Specifies that window panes should include all messages collected for the window (key) so far, even if they were
-   * included in previously emitted window panes.
-   *  <li> DISCARDING: Specifies that window panes should only include messages collected for this window (key) since the last emitted
-   * window pane.
+   *  <li> ACCUMULATING: Specifies that window panes should include all messages collected for the window so far,
+   *  even if they were included in previously emitted window panes.
+   *  <li> DISCARDING: Specifies that window panes should only include messages collected for this window since
+   *  the last emitted window pane.
    * </ul>
    *
    * @param mode the accumulation mode
-   * @return the {@link Window} function with the specified {@link AccumulationMode}.
+   * @return the {@link Window} function with {@code mode} set as its {@link AccumulationMode}.
    */
   Window<M, K, WV> setAccumulationMode(AccumulationMode mode);
 
index a0269cd..bb837f6 100644 (file)
@@ -36,14 +36,14 @@ import java.util.function.Supplier;
 /**
  * APIs for creating different types of {@link Window}s.
  *
- * Groups the incoming messages in the {@link org.apache.samza.operators.MessageStream} into finite windows for processing.
+ * Groups incoming messages in a {@link org.apache.samza.operators.MessageStream} into finite windows for processing.
  *
- * <p> Each window is uniquely identified by its {@link WindowKey}. A window can have one or more associated {@link Trigger}s
- * that determine when results from the {@link Window} are emitted. Each emitted result contains one or more
- * messages in the window and is called a {@link WindowPane}.
+ * <p> Each window is uniquely identified by its {@link WindowKey}. A window can have one or more associated
+ * {@link Trigger}s that determine when results from the {@link Window} are emitted. Each emitted result contains one
+ * or more messages in the window and is called a {@link WindowPane}.
  *
- * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the window
- * has arrived or late triggers that allow handling of late data arrivals.
+ * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data
+ * for the window has arrived, or late triggers that allow handling late arrivals of data.
  *
  *                                     window wk1
  *                                      +--------------------------------+
@@ -52,9 +52,9 @@ import java.util.function.Supplier;
  *                                      | pane 1    |pane2   |   pane3   |
  *                                      +-----------+--------+-----------+
  *
- -----------------------------------
- *incoming message stream ------+
- -----------------------------------
-----------------------------------
+ *     incoming message stream ------+
-----------------------------------
  *                                      window wk2
  *                                      +---------------------+---------+
  *                                      |   pane 1|   pane 2  |  pane 3 |
@@ -72,20 +72,22 @@ import java.util.function.Supplier;
  * <p> A {@link Window} can be one of the following types:
  * <ul>
  *   <li>
- *     Tumbling Windows: A tumbling window defines a series of non-overlapping, fixed size, contiguous intervals.
+ *     Tumbling Window: A tumbling window defines a series of non-overlapping, fixed size, contiguous intervals.
  *   <li>
- *     Session Windows: A session window groups a {@link org.apache.samza.operators.MessageStream} into sessions.
+ *     Session Window: A session window groups a {@link org.apache.samza.operators.MessageStream} into sessions.
  *     A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}.
  *     The boundary for a session is defined by a {@code sessionGap}. All messages that that arrive within
  *     the gap are grouped into the same session.
- *   <li>
- *     Global Windows: A global window defines a single infinite window over the entire {@link org.apache.samza.operators.MessageStream}.
- *     An early trigger must be specified when defining a global window.
  * </ul>
  *
- * <p> A {@link Window} is defined as "keyed" when the incoming messages are first grouped based on their key
- * and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants of all the above window
- * types.
+ * <p> A {@link Window} is said to be "keyed" when the incoming messages are first grouped based on their key
+ * and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants
+ * of the window types above.
+ *
+ * <p> The value for the window can be updated incrementally by providing an {@code initialValue} {@link Supplier}
+ * and an aggregating {@link FoldLeftFunction}. The initial value supplier is invoked every time a new window is
+ * created. The aggregating function is invoked for each incoming message for the window. If these are not provided,
+ * the emitted {@link WindowPane} will contain a collection of messages in the window.
  *
  * <p> Time granularity for windows: Currently, time durations are always measured in milliseconds. Time units of
  * finer granularity are not supported.
@@ -104,7 +106,8 @@ public final class Windows {
    * <pre> {@code
    *    MessageStream<UserClick> stream = ...;
    *    Function<UserClick, String> keyFn = ...;
-   *    BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
+   *    Supplier<Integer> initialValue = () -> 0;
+   *    FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
    *    MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
    *        Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), maxAggregator));
    * }
@@ -112,18 +115,20 @@ public final class Windows {
    *
    * @param keyFn the function to extract the window key from a message
    * @param interval the duration in processing time
-   * @param initialValue the initial value to be used for aggregations
-   * @param foldFn the function to aggregate messages in the {@link WindowPane}
+   * @param initialValue the initial value supplier for the aggregator. Invoked when a new window is created.
+   * @param aggregator the function to incrementally update the window value. Invoked when a new message
+   *                   arrives for the window.
    * @param <M> the type of the input message
    * @param <WV> the type of the {@link WindowPane} output value
    * @param <K> the type of the key in the {@link Window}
    * @return the created {@link Window} function.
    */
-  public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(Function<? super M, ? extends K> keyFn, Duration interval,
-                                                                Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> foldFn) {
+  public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(
+      Function<? super M, ? extends K> keyFn, Duration interval,
+      Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator) {
 
     Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
-    return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) foldFn,
+    return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
         (Function<M, K>) keyFn, null, WindowType.TUMBLING);
   }
 
@@ -148,7 +153,8 @@ public final class Windows {
    * @param <K> the type of the key in the {@link Window}
    * @return the created {@link Window} function
    */
-  public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(Function<? super M, ? extends K> keyFn, Duration interval) {
+  public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(
+      Function<? super M, ? extends K> keyFn, Duration interval) {
     FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();
 
     Supplier<Collection<M>> initialValue = ArrayList::new;
@@ -163,23 +169,25 @@ public final class Windows {
    *
    * <pre> {@code
    *    MessageStream<String> stream = ...;
-   *    BiFunction<String, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
+   *    Supplier<Integer> initialValue = () -> 0;
+   *    FoldLeftFunction<String, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
    *    MessageStream<WindowPane<Void, Integer>> windowedStream = stream.window(
    *        Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator));
    * }
    * </pre>
    *
-   * @param duration the duration in processing time
-   * @param initialValue the initial value to be used for aggregations
-   * @param foldFn to aggregate messages in the {@link WindowPane}
+   * @param interval the duration in processing time
+   * @param initialValue the initial value supplier for the aggregator. Invoked when a new window is created.
+   * @param aggregator the function to incrementally update the window value. Invoked when a new message
+   *                   arrives for the window.
    * @param <M> the type of the input message
    * @param <WV> the type of the {@link WindowPane} output value
    * @return the created {@link Window} function
    */
-  public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration duration, Supplier<? extends WV> initialValue,
-                                                           FoldLeftFunction<? super M, WV> foldFn) {
-    Trigger<M> defaultTrigger = new TimeTrigger<>(duration);
-    return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) foldFn,
+  public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration interval, Supplier<? extends WV> initialValue,
+      FoldLeftFunction<? super M, WV> aggregator) {
+    Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
+    return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
         null, null, WindowType.TUMBLING);
   }
 
@@ -191,10 +199,12 @@ public final class Windows {
    *
    * <pre> {@code
    *    MessageStream<Long> stream = ...;
-   *    Function<Collection<Long, Long>> percentile99 = ..
+   *    Function<Collection<Long>, Long> percentile99 = ..
    *
-   *    MessageStream<WindowPane<Void, Collection<Long>>> windowedStream = integerStream.window(Windows.tumblingWindow(Duration.ofSeconds(10)));
-   *    MessageStream<Long> windowedPercentiles = windowed.map(windowedOutput -> percentile99(windowedOutput.getMessage());
+   *    MessageStream<WindowPane<Void, Collection<Long>>> windowedStream =
+   *        integerStream.window(Windows.tumblingWindow(Duration.ofSeconds(10)));
+   *    MessageStream<Long> windowedPercentiles =
+   *        windowedStream.map(windowPane -> percentile99(windowPane.getMessage());
    * }
    * </pre>
    *
@@ -210,18 +220,19 @@ public final class Windows {
   }
 
   /**
-   * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided {@code sessionGap}
-   * and applies the provided fold function to them.
+   * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided
+   * {@code sessionGap} and applies the provided fold function to them.
    *
    * <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}.
-   * A session is considered complete when no new messages arrive within the {@code sessionGap}. All messages that arrive within
-   * the gap are grouped into the same session.
+   * A session is considered complete when no new messages arrive within the {@code sessionGap}. All messages
+   * that arrive within the gap are grouped into the same session.
    *
    * <p>The below example computes the maximum value per-key over a session window of gap 10 seconds.
    *
    * <pre> {@code
    *    MessageStream<UserClick> stream = ...;
-   *    BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
+   *    Supplier<Integer> initialValue = () -> 0;
+   *    FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
    *    Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
    *    MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
    *        Windows.keyedSessionWindow(userIdExtractor, Duration.minute(1), maxAggregator));
@@ -230,22 +241,25 @@ public final class Windows {
    *
    * @param keyFn the function to extract the window key from a message
    * @param sessionGap the timeout gap for defining the session
-   * @param initialValue the initial value to be used for aggregations
-   * @param foldFn the function to aggregate messages in the {@link WindowPane}
+   * @param initialValue the initial value supplier for the aggregator. Invoked when a new window is created.
+   * @param aggregator the function to incrementally update the window value. Invoked when a new message
+   *                   arrives for the window.
    * @param <M> the type of the input message
    * @param <K> the type of the key in the {@link Window}
    * @param <WV> the type of the output value in the {@link WindowPane}
    * @return the created {@link Window} function
    */
-  public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<? super M, ? extends K> keyFn, Duration sessionGap,
-                                                               Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> foldFn) {
+  public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(
+      Function<? super M, ? extends K> keyFn, Duration sessionGap,
+      Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator) {
     Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap);
-    return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) foldFn, (Function<M, K>) keyFn,
-        null, WindowType.SESSION);
+    return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator,
+        (Function<M, K>) keyFn, null, WindowType.SESSION);
   }
 
   /**
-   * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided {@code sessionGap}.
+   * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided
+   * {@code sessionGap}.
    *
    * <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}. The
    * boundary for the session is defined by a {@code sessionGap}. All messages that that arrive within
@@ -255,7 +269,8 @@ public final class Windows {
    *
    * <pre> {@code
    *    MessageStream<UserClick> stream = ...;
-   *    BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseIntField(m), c);
+   *    Supplier<Integer> initialValue = () -> 0;
+   *    FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseIntField(m), c);
    *    Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
    *    MessageStream<WindowPane<String>, Collection<M>> windowedStream = stream.window(
    *        Windows.keyedSessionWindow(userIdExtractor, Duration.ofSeconds(10)));
@@ -268,7 +283,8 @@ public final class Windows {
    * @param <K> the type of the key in the {@link Window}
    * @return the created {@link Window} function
    */
-  public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(Function<? super M, ? extends K> keyFn, Duration sessionGap) {
+  public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(
+      Function<? super M, ? extends K> keyFn, Duration sessionGap) {
 
     FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();
 
index dc5742f..4ef3d30 100644 (file)
@@ -56,11 +56,18 @@ public interface TaskContext {
   void setStartingOffset(SystemStreamPartition ssp, String offset);
 
   /**
-   * Method to allow user to return customized context
+   * Sets the user-defined context.
    *
-   * @return  user-defined task context object
+   * @param context the user-defined context to set
    */
-  default Object getUserDefinedContext() {
+  default void setUserContext(Object context) { }
+
+  /**
+   * Gets the user-defined context.
+   *
+   * @return the user-defined context if set, else null
+   */
+  default Object getUserContext() {
     return null;
-  };
+  }
 }
index 31a75ce..1f1d282 100644 (file)
@@ -56,7 +56,7 @@ public class StreamGraphImpl implements StreamGraph {
   private final ApplicationRunner runner;
   private final Config config;
 
-  private ContextManager contextManager = new ContextManager() { };
+  private ContextManager contextManager = null;
 
   public StreamGraphImpl(ApplicationRunner runner, Config config) {
     // TODO: SAMZA-1118 - Move StreamSpec and ApplicationRunner out of StreamGraphImpl once Systems
index 4720298..b18cf06 100644 (file)
@@ -86,13 +86,15 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
     // initialize the user-implemented stream application.
     this.streamApplication.init(streamGraph, config);
 
-    // get the user-implemented context manager and initialize the task-specific context.
+    // get the user-implemented context manager and initialize it
     this.contextManager = streamGraph.getContextManager();
-    TaskContext initializedTaskContext = this.contextManager.initTaskContext(config, context);
+    if (this.contextManager != null) {
+      this.contextManager.init(config, context);
+    }
 
     // create the operator impl DAG corresponding to the logical operator spec DAG
     OperatorImplGraph operatorImplGraph = new OperatorImplGraph(clock);
-    operatorImplGraph.init(streamGraph, config, initializedTaskContext);
+    operatorImplGraph.init(streamGraph, config, context);
     this.operatorImplGraph = operatorImplGraph;
 
     // TODO: SAMZA-1118 - Remove mapping after SystemConsumer starts returning logical streamId with incoming messages
@@ -135,6 +137,8 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
 
   @Override
   public void close() throws Exception {
-    this.contextManager.finalizeTaskContext();
+    if (this.contextManager != null) {
+      this.contextManager.close();
+    }
   }
 }
index c04776a..84e993b 100644 (file)
@@ -64,6 +64,7 @@ class TaskInstance(
   val isAsyncTask = task.isInstanceOf[AsyncStreamTask]
 
   val context = new TaskContext {
+    var userContext: Object = null;
     def getMetricsRegistry = metrics.registry
     def getSystemStreamPartitions = systemStreamPartitions.asJava
     def getStore(storeName: String) = if (storageManager != null) {
@@ -80,6 +81,14 @@ class TaskInstance(
       val startingOffsets = offsetManager.startingOffsets
       offsetManager.startingOffsets += taskName -> (startingOffsets(taskName) + (ssp -> offset))
     }
+
+    override def setUserContext(context: Object): Unit = {
+      userContext = context
+    }
+
+    override def getUserContext: Object = {
+      userContext
+    }
   }
   // store the (ssp -> if this ssp is catched up) mapping. "catched up"
   // means the same ssp in other taskInstances have the same offset as
index 77a8960..666bbb8 100644 (file)
@@ -28,7 +28,6 @@ import org.apache.samza.operators.stream.IntermediateStreamInternalImpl;
 import org.apache.samza.operators.stream.OutputStreamInternalImpl;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
-import org.apache.samza.task.TaskContext;
 import org.junit.Test;
 
 import java.util.function.BiFunction;
@@ -137,33 +136,6 @@ public class TestStreamGraphImpl {
   }
 
   @Test
-  public void testWithContextManager() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-
-    // ensure that default is noop
-    TaskContext mockContext = mock(TaskContext.class);
-    assertEquals(graph.getContextManager().initTaskContext(mockConfig, mockContext), mockContext);
-
-    ContextManager testContextManager = new ContextManager() {
-      @Override
-      public TaskContext initTaskContext(Config config, TaskContext context) {
-        return null;
-      }
-
-      @Override
-      public void finalizeTaskContext() {
-
-      }
-    };
-
-    graph.withContextManager(testContextManager);
-    assertEquals(graph.getContextManager(), testContextManager);
-  }
-
-  @Test
   public void testGetIntermediateStream() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);