Javadoc cleanup for new Application, Descriptor, Context and Table APIs.
authorPrateek Maheshwari <pmaheshwari@apache.org>
Thu, 18 Oct 2018 06:58:16 +0000 (23:58 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Thu, 18 Oct 2018 06:58:16 +0000 (23:58 -0700)
Author: Prateek Maheshwari <pmaheshwari@apache.org>

Reviewers: Cameron Lee<calee@linkedin.com>

Closes #737 from prateekm/javadoc-cleanup

59 files changed:
samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java
samza-api/src/main/java/org/apache/samza/application/StreamApplication.java
samza-api/src/main/java/org/apache/samza/application/TaskApplication.java
samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
samza-api/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptor.java
samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContext.java
samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java
samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContext.java
samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java
samza-api/src/main/java/org/apache/samza/context/ContainerContext.java
samza-api/src/main/java/org/apache/samza/context/Context.java
samza-api/src/main/java/org/apache/samza/context/JobContext.java
samza-api/src/main/java/org/apache/samza/context/TaskContext.java
samza-api/src/main/java/org/apache/samza/system/descriptors/GenericInputDescriptor.java
samza-api/src/main/java/org/apache/samza/system/descriptors/GenericOutputDescriptor.java
samza-api/src/main/java/org/apache/samza/system/descriptors/GenericSystemDescriptor.java
samza-api/src/main/java/org/apache/samza/system/descriptors/InputDescriptor.java
samza-api/src/main/java/org/apache/samza/system/descriptors/OutputDescriptor.java
samza-api/src/main/java/org/apache/samza/system/descriptors/StreamDescriptor.java
samza-api/src/main/java/org/apache/samza/system/descriptors/SystemDescriptor.java
samza-api/src/main/java/org/apache/samza/table/Table.java
samza-api/src/main/java/org/apache/samza/table/descriptors/TableDescriptor.java
samza-api/src/main/java/org/apache/samza/task/TaskFactory.java
samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java
samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java
samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaInputDescriptor.java
samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaOutputDescriptor.java
samza-kafka/src/main/java/org/apache/samza/system/kafka/descriptors/KafkaSystemDescriptor.java
samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java
samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
samza-test/src/main/java/org/apache/samza/example/MergeExample.java
samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
samza-test/src/main/java/org/apache/samza/example/TaskApplicationExample.java
samza-test/src/main/java/org/apache/samza/example/WindowExample.java
samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java
samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java

index 5423e2e..849b2b3 100644 (file)
@@ -23,19 +23,29 @@ import org.apache.samza.application.descriptors.ApplicationDescriptor;
 
 
 /**
- * The base interface for all user-implemented applications in Samza.
+ * A {@link SamzaApplication} describes the inputs, outputs, state, configuration and the logic
+ * for processing data from one or more streaming sources.
  * <p>
- * The main processing logic of the user application should be implemented in {@link SamzaApplication#describe(ApplicationDescriptor)}
- * method. Sub-classes {@link StreamApplication} and {@link TaskApplication} are specific interfaces for applications
- * written in high-level DAG and low-level task APIs, respectively.
+ * This is the base {@link SamzaApplication}. Implement a {@link StreamApplication} to describe the
+ * processing logic using Samza's High Level API in terms of {@link org.apache.samza.operators.MessageStream}
+ * operators, or a {@link TaskApplication} to describe it using Samza's Low Level API in terms of per-message
+ * processing logic.
+ * <p>
+ * A {@link SamzaApplication} implementation must have a no-argument constructor, which will be used by the framework
+ * to create new instances and call {@link #describe(ApplicationDescriptor)}.
+ * <p>
+ * Per container context may be managed using {@link org.apache.samza.context.ApplicationContainerContext} and
+ * set using {@link ApplicationDescriptor#withApplicationContainerContextFactory}. Similarly, per task context
+ * may be managed using {@link org.apache.samza.context.ApplicationTaskContext} and set using
+ * {@link ApplicationDescriptor#withApplicationTaskContextFactory}.
  */
 @InterfaceStability.Evolving
 public interface SamzaApplication<S extends ApplicationDescriptor> {
 
   /**
-   * Describes the user processing logic via {@link ApplicationDescriptor}
+   * Describes the inputs, outputs, state, configuration and processing logic using the provided {@code appDescriptor}.
    *
-   * @param appDesc the {@link ApplicationDescriptor} object to describe user application logic
+   * @param appDescriptor the {@link ApplicationDescriptor} to use for describing the application.
    */
-  void describe(S appDesc);
+  void describe(S appDescriptor);
 }
index fe77045..3749b58 100644 (file)
@@ -21,22 +21,38 @@ package org.apache.samza.application;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 
+
 /**
- * Describes and initializes the transforms for processing message streams and generating results in high-level API. 
+ * A {@link StreamApplication} describes the inputs, outputs, state, configuration and the processing logic
+ * in Samza's High Level API.
+ * <p>
+ * A typical {@link StreamApplication} implementation consists of the following stages:
+ * <ol>
+ *   <li>Configuring the inputs, outputs and state (tables) using the appropriate
+ *   {@link org.apache.samza.system.descriptors.SystemDescriptor}s,
+ *   {@link org.apache.samza.system.descriptors.InputDescriptor}s,
+ *   {@link org.apache.samza.system.descriptors.OutputDescriptor}s and
+ *   {@link org.apache.samza.table.descriptors.TableDescriptor}s
+ *   <li>Obtaining the corresponding
+ *   {@link org.apache.samza.operators.MessageStream}s,
+ *   {@link org.apache.samza.operators.OutputStream}s and
+ *   {@link org.apache.samza.table.Table}s from the provided {@link StreamApplicationDescriptor}.
+ *   <li>Defining the processing logic using operators and functions on the streams and tables thus obtained.
+ *   E.g., {@link org.apache.samza.operators.MessageStream#filter(org.apache.samza.operators.functions.FilterFunction)}
+ * </ol>
  * <p>
- * The following example removes page views older than 1 hour from the input stream:
+ * The following example {@link StreamApplication} removes page views older than 1 hour from the input stream:
  * <pre>{@code
  * public class PageViewFilter implements StreamApplication {
- *   public void describe(StreamAppDescriptor appDesc) {
- *     KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
+ *   public void describe(StreamApplicationDescriptor appDescriptor) {
+ *     KafkaSystemDescriptor trackingSystemDescriptor = new KafkaSystemDescriptor("tracking");
  *     KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
- *         trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
- *
+ *         trackingSystemDescriptor.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
  *     KafkaOutputDescriptor<PageViewEvent>> outputStreamDescriptor =
- *         trackingSystem.getOutputDescriptor("recentPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)));
+ *         trackingSystemDescriptor.getOutputDescriptor("recentPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)));
  *
- *     MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor);
- *     OutputStream<PageViewEvent> recentPageViewEvents = appDesc.getOutputStream(outputStreamDescriptor);
+ *     MessageStream<PageViewEvent> pageViewEvents = appDescriptor.getInputStream(inputStreamDescriptor);
+ *     OutputStream<PageViewEvent> recentPageViewEvents = appDescriptor.getOutputStream(outputStreamDescriptor);
  *
  *     pageViewEvents
  *       .filter(m -> m.getCreationTime() > System.currentTimeMillis() - Duration.ofHours(1).toMillis())
@@ -44,33 +60,20 @@ import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
  *   }
  * }
  * }</pre>
- *<p>
- * The example above can be run using an ApplicationRunner:
- * <pre>{@code
- *   public static void main(String[] args) {
- *     CommandLine cmdLine = new CommandLine();
- *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- *     PageViewFilter app = new PageViewFilter();
- *     ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config);
- *     runner.run();
- *     runner.waitForFinish();
- *   }
- * }</pre>
- *
+ * <p>
+ * All operator function implementations used in a {@link StreamApplication} must be {@link java.io.Serializable}. Any
+ * context required within an operator function may be managed by implementing the
+ * {@link org.apache.samza.operators.functions.InitableFunction#init} and
+ * {@link org.apache.samza.operators.functions.ClosableFunction#close} methods in the function implementation.
+ * <p>
+ * Functions may implement the {@link org.apache.samza.operators.functions.ScheduledFunction} interface
+ * to schedule and receive periodic callbacks from the Samza framework.
  * <p>
  * Implementation Notes: Currently {@link StreamApplication}s are wrapped in a {@link org.apache.samza.task.StreamTask}
  * during execution. The execution planner will generate a serialized DAG which will be deserialized in each
  * {@link org.apache.samza.task.StreamTask} instance used for processing incoming messages. Execution is synchronous
- * and thread-safe within each {@link org.apache.samza.task.StreamTask}.
- *
- * <p>
- * A {@link StreamApplication} implementation must have a proper fully-qualified class name and a default constructor
- * with no parameters to ensure successful instantiation in both local and remote environments.
- * Functions implemented for transforms in StreamApplications ({@link org.apache.samza.operators.functions.MapFunction},
- * {@link org.apache.samza.operators.functions.FilterFunction} for e.g.) are initable and closable. They are initialized
- * before messages are delivered to them and closed after their execution when the {@link org.apache.samza.task.StreamTask}
- * instance is closed. See {@link org.apache.samza.operators.functions.InitableFunction} and {@link org.apache.samza.operators.functions.ClosableFunction}.
- * Function implementations are required to be {@link java.io.Serializable}.
+ * and thread-safe within each {@link org.apache.samza.task.StreamTask}. Multiple tasks may process their
+ * messages concurrently depending on the job parallelism configuration.
  */
 @InterfaceStability.Evolving
 public interface StreamApplication extends SamzaApplication<StreamApplicationDescriptor> {
index d84aa12..4210393 100644 (file)
@@ -23,64 +23,49 @@ import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
 
 
 /**
- * Describes and initializes the transforms for processing message streams and generating results in low-level API. Your
- * application is expected to implement this interface.
+ * A {@link TaskApplication} describes the inputs, outputs, state, configuration and the processing logic
+ * in Samza's Low Level API.
+ * A typical {@link TaskApplication} implementation consists of the following stages:
+ * <ol>
+ *   <li>Configuring the inputs, outputs and state (tables) using the appropriate
+ *   {@link org.apache.samza.system.descriptors.SystemDescriptor}s,
+ *   {@link org.apache.samza.system.descriptors.StreamDescriptor}s and
+ *   {@link org.apache.samza.table.descriptors.TableDescriptor}s
+ *   <li>Adding these descriptors to the provided {@link TaskApplicationDescriptor}.
+ *   <li>Defining the processing logic by implementing a {@link org.apache.samza.task.StreamTask} or
+ *   {@link org.apache.samza.task.AsyncStreamTask} that operates on each
+ *   {@link org.apache.samza.system.IncomingMessageEnvelope} one at a time.
+ *   <li>Setting a {@link org.apache.samza.task.TaskFactory} using
+ *   {@link TaskApplicationDescriptor#setTaskFactory(org.apache.samza.task.TaskFactory)} that creates instances of the
+ *   task above. The {@link org.apache.samza.task.TaskFactory} implementation must be {@link java.io.Serializable}.
+ * </ol>
  * <p>
- * The following example removes page views older than 1 hour from the input stream:
+ * The following example {@link TaskApplication} removes page views older than 1 hour from the input stream:
  * <pre>{@code
  * public class PageViewFilter implements TaskApplication {
- *   public void describe(TaskAppDescriptor appDesc) {
- *     KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor(PageViewTask.SYSTEM);
+ *   public void describe(TaskApplicationDescriptor appDescriptor) {
+ *     KafkaSystemDescriptor trackingSystemDescriptor = new KafkaSystemDescriptor("tracking");
  *     KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
- *         trackingSystem.getInputDescriptor(PageViewTask.TASK_INPUT, new JsonSerdeV2<>(PageViewEvent.class));
- *
+ *         trackingSystemDescriptor.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
  *     KafkaOutputDescriptor<PageViewEvent>> outputStreamDescriptor =
- *         trackingSystem.getOutputDescriptor(PageViewTask.TASK_OUTPUT, new JsonSerdeV2<>(PageViewEvent.class)));
+ *         trackingSystemDescriptor.getOutputDescriptor("recentPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)));
  *
- *     appDesc.addInputStream(inputStreamDescriptor);
- *     appDesc.addOutputStream(outputStreamDescriptor);
- *     appDesc.setTaskFactory((StreamTaskFactory) () -> new PageViewTask());
+ *     appDescriptor.addInputStream(inputStreamDescriptor);
+ *     appDescriptor.addOutputStream(outputStreamDescriptor);
+ *     appDescriptor.setTaskFactory((StreamTaskFactory) () -> new PageViewTask());
  *   }
  * }
  *
  * public class PageViewTask implements StreamTask {
- *   final static String TASK_INPUT = "pageViewEvents";
- *   final static String TASK_OUTPUT = "recentPageViewEvents";
- *   final static String SYSTEM = "kafka";
- *
- *   public void process(IncomingMessageEnvelope message, MessageCollector collector,
- *       TaskCoordinator coordinator) {
+ *   public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
  *     PageViewEvent m = (PageViewEvent) message.getValue();
  *     if (m.getCreationTime() > System.currentTimeMillis() - Duration.ofHours(1).toMillis()) {
- *       collector.send(new OutgoingMessageEnvelope(new SystemStream(SYSTEM, TASK_OUTPUT),
- *           message.getKey(), message.getKey(), m));
+ *       collector.send(new OutgoingMessageEnvelope(
+ *          new SystemStream("tracking", "recentPageViewEvent"), message.getKey(), message.getKey(), m));
  *     }
  *   }
  * }
  * }</pre>
- *
- *<p>
- * The example above can be run using an ApplicationRunner:
- * <pre>{@code
- *   public static void main(String[] args) {
- *     CommandLine cmdLine = new CommandLine();
- *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- *     PageViewFilter app = new PageViewFilter();
- *     ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config);
- *     runner.run();
- *     runner.waitForFinish();
- *   }
- * }</pre>
- *
- * <p>
- * Implementation Notes: {@link TaskApplication} allow users to instantiate {@link org.apache.samza.task.StreamTask} or
- * {@link org.apache.samza.task.AsyncStreamTask} when describing the processing logic. A new {@link TaskApplicationDescriptor }
- * instance will be created and described by the user-defined {@link TaskApplication} when planning the execution.
- * {@link org.apache.samza.task.TaskFactory} is required to be serializable.
- *
- * <p>
- * The user-implemented {@link TaskApplication} class must be a class with proper fully-qualified class name and
- * a default constructor with no parameters to ensure successful instantiation in both local and remote environments.
  */
 @InterfaceStability.Evolving
 public interface TaskApplication extends SamzaApplication<TaskApplicationDescriptor> {
index b2d54ca..6a4c9fd 100644 (file)
@@ -29,32 +29,42 @@ import org.apache.samza.system.descriptors.SystemDescriptor;
 
 
 /**
- * The interface class to describe the configuration, input and output streams, and processing logic in a
+ * An {@link ApplicationDescriptor} contains the description of inputs, outputs, state, configuration and the
+ * processing logic for a {@link org.apache.samza.application.SamzaApplication}.
+ * <p>
+ * This is the base {@link ApplicationDescriptor} and provides functionality common to all
  * {@link org.apache.samza.application.SamzaApplication}.
+ * {@link org.apache.samza.application.StreamApplication#describe} will provide access to a
+ * {@link StreamApplicationDescriptor} with additional functionality for describing High Level API applications.
+ * Similarly, {@link org.apache.samza.application.TaskApplication#describe} will provide access to a
+ * {@link TaskApplicationDescriptor} with additional functionality for describing Low Level API applications.
  * <p>
- * Sub-classes {@link StreamApplicationDescriptor} and {@link TaskApplicationDescriptor} are specific interfaces for
- * applications written in high-level {@link org.apache.samza.application.StreamApplication} and low-level
- * {@link org.apache.samza.application.TaskApplication} APIs, respectively.
- *
- * @param <S> sub-class of user application descriptor.
+ * Use the {@link ApplicationDescriptor} to set the container scope context factory using
+ * {@link ApplicationDescriptor#withApplicationContainerContextFactory}, and task scope context factory using
+ * {@link ApplicationDescriptor#withApplicationTaskContextFactory}. Please note that the terms {@code container}
+ * and {@code task} here refer to the units of physical and logical parallelism, not the programming API.
  */
 @InterfaceStability.Evolving
 public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
 
   /**
-   * Get the {@link Config} of the application
-   * @return config of the application
+   * Get the configuration for the application.
+   * @return config for the application
    */
   Config getConfig();
 
   /**
-   * Sets the default SystemDescriptor to use for the application. This is equivalent to setting
-   * {@code job.default.system} and its properties in configuration.
+   * Sets the {@link SystemDescriptor} for the default system for the application.
+   * <p>
+   * The default system is used by the framework for creating any internal (e.g., coordinator, changelog, checkpoint)
+   * streams. In an {@link org.apache.samza.application.StreamApplication}, it is also used for creating any
+   * intermediate streams; e.g., those created by the {@link org.apache.samza.operators.MessageStream#partitionBy} and
+   * {@link org.apache.samza.operators.MessageStream#broadcast} operators.
    * <p>
    * If the default system descriptor is set, it must be set <b>before</b> creating any input/output/intermediate streams.
    *
-   * @param defaultSystemDescriptor the default system descriptor to use
-   * @return type {@code S} of {@link ApplicationDescriptor} with {@code defaultSystemDescriptor} set as its default system
+   * @param defaultSystemDescriptor the {@link SystemDescriptor} for the default system for the application
+   * @return this {@link ApplicationDescriptor}
    */
   S withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor);
 
@@ -64,10 +74,11 @@ public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
    * context can be accessed through the {@link org.apache.samza.context.Context}.
    * <p>
    * Setting this is optional.
+   * <p>
+   * The provided {@code factory} instance must be {@link java.io.Serializable}.
    *
    * @param factory the {@link ApplicationContainerContextFactory} for this application
-   * @return type {@code S} of {@link ApplicationDescriptor} with {@code factory} set as its
-   * {@link ApplicationContainerContextFactory}
+   * @return this {@link ApplicationDescriptor}
    */
   S withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> factory);
 
@@ -77,31 +88,37 @@ public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
    * accessed through the {@link org.apache.samza.context.Context}.
    * <p>
    * Setting this is optional.
+   * <p>
+   * The provided {@code factory} instance must be {@link java.io.Serializable}.
    *
    * @param factory the {@link ApplicationTaskContextFactory} for this application
-   * @return type {@code S} of {@link ApplicationDescriptor} with {@code factory} set as its
-   * {@link ApplicationTaskContextFactory}
+   * @return this {@link ApplicationDescriptor}
    */
   S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> factory);
 
   /**
    * Sets the {@link ProcessorLifecycleListenerFactory} for this application.
-   *
-   * <p>Setting a {@link ProcessorLifecycleListenerFactory} is optional to a user application. It allows users to
+   * <p>
+   * Setting a {@link ProcessorLifecycleListenerFactory} is optional to a user application. It allows users to
    * plug in optional code to be invoked in different stages before/after the main processing logic is started/stopped in
    * the application.
+   * <p>
+   * The provided {@code factory} instance must be {@link java.io.Serializable}.
    *
    * @param listenerFactory the user implemented {@link ProcessorLifecycleListenerFactory} that creates lifecycle listener
    *                        with callback methods before and after the start/stop of each StreamProcessor in the application
-   * @return type {@code S} of {@link ApplicationDescriptor} with {@code listenerFactory} set as its {@link ProcessorLifecycleListenerFactory}
+   * @return this {@link ApplicationDescriptor}
    */
   S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory);
 
   /**
-   * Sets a set of customized {@link MetricsReporterFactory}s in the application
+   * Sets the {@link org.apache.samza.metrics.MetricsReporterFactory}s for creating the
+   * {@link org.apache.samza.metrics.MetricsReporter}s to use for the application.
+   * <p>
+   * The provided {@link MetricsReporterFactory} instances must be {@link java.io.Serializable}.
    *
-   * @param reporterFactories the map of customized {@link MetricsReporterFactory}s to be used
-   * @return type {@code S} of {@link ApplicationDescriptor} with {@code reporterFactories}
+   * @param reporterFactories a map of {@link org.apache.samza.metrics.MetricsReporter} names to their factories.
+   * @return this {@link ApplicationDescriptor}
    */
   S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories);
 
index 3a35054..4a77c6c 100644 (file)
@@ -29,7 +29,16 @@ import org.apache.samza.table.Table;
 
 
 /**
- * The interface class to describe a {@link org.apache.samza.application.SamzaApplication} in high-level API in Samza.
+ * A {@link StreamApplicationDescriptor} contains the description of inputs, outputs, state, configuration and the
+ * processing logic for a Samza High Level API {@link org.apache.samza.application.StreamApplication}.
+ * <p>
+ * Use the {@link StreamApplicationDescriptor} obtained from
+ * {@link org.apache.samza.application.StreamApplication#describe} to get the {@link MessageStream}s,
+ * {@link OutputStream}s and {@link Table}s corresponding to their respective {@link InputDescriptor}s,
+ * {@link OutputDescriptor}s and {@link TableDescriptor}s.
+ * <p>
+ * Use the {@link MessageStream} API operators to describe the processing logic for the
+ * {@link org.apache.samza.application.StreamApplication}.
  */
 @InterfaceStability.Evolving
 public interface StreamApplicationDescriptor extends ApplicationDescriptor<StreamApplicationDescriptor> {
index 4730297..b395a06 100644 (file)
@@ -26,18 +26,28 @@ import org.apache.samza.task.TaskFactory;
 
 
 /**
- *  The interface to describe a {@link org.apache.samza.application.SamzaApplication} that uses low-level API task
- *  for processing.
+ * A {@link TaskApplicationDescriptor} contains the description of inputs, outputs, state, configuration and the
+ * processing logic for a Samza Low Level API {@link org.apache.samza.application.TaskApplication}.
+ * <p>
+ * Use the {@link TaskApplicationDescriptor} obtained from {@link org.apache.samza.application.TaskApplication#describe}
+ * to add the {@link InputDescriptor}s, {@link OutputDescriptor}s and {@link TableDescriptor}s for streams and
+ * tables to be used in the task implementation.
+ * <p>
+ * Use {@link #setTaskFactory} to set the factory for the {@link org.apache.samza.task.StreamTask} or
+ * {@link org.apache.samza.task.AsyncStreamTask} implementation that contains the processing logic for
+ * the {@link org.apache.samza.application.TaskApplication}.
  */
 @InterfaceStability.Evolving
 public interface TaskApplicationDescriptor extends ApplicationDescriptor<TaskApplicationDescriptor> {
 
   /**
-   * Sets the {@link TaskFactory} for the user application. The {@link TaskFactory#createInstance()} creates task instance
-   * that implements the main processing logic of the user application.
+   * Sets the {@link org.apache.samza.task.StreamTaskFactory} or {@link org.apache.samza.task.AsyncStreamTaskFactory}
+   * for the {@link org.apache.samza.task.StreamTask} or {@link org.apache.samza.task.AsyncStreamTask} implementation
+   * that contains the processing logic for the {@link org.apache.samza.application.TaskApplication}.
+   * <p>
+   * The provided {@code taskFactory} instance must be serializable.
    *
-   * @param factory the {@link TaskFactory} including the low-level task processing logic. The only allowed task factory
-   *                classes are {@link org.apache.samza.task.StreamTaskFactory} and {@link org.apache.samza.task.AsyncStreamTaskFactory}.
+   * @param factory the {@link TaskFactory} for the Low Level API Task implementation
    */
   void setTaskFactory(TaskFactory factory);
 
index aab8c7f..8ac34a5 100644 (file)
  */
 package org.apache.samza.context;
 
+
 /**
- * An application should implement this to contain any runtime objects required by processing logic which can be shared
- * across all tasks in a container. A single instance of this will be created in each container. Note that if the
- * container moves or the container model changes (e.g. container failure/rebalancing), then this will be recreated.
+ * An {@link ApplicationContainerContext} instance can be used for holding per-container runtime state and objects
+ * and managing their lifecycle. This context is shared across all tasks in the container.
+ * <p>
+ * Use {@link org.apache.samza.application.descriptors.ApplicationDescriptor#withApplicationContainerContextFactory}
+ * to provide the {@link ApplicationContainerContextFactory}. Use {@link Context#getApplicationContainerContext()} to
+ * get the created {@link ApplicationContainerContext} instance for the current container.
  * <p>
- * This needs to be created by an implementation of {@link ApplicationContainerContextFactory}. The factory should
- * create the runtime objects contained within this context.
+ * A unique instance of {@link ApplicationContainerContext} is created in each container. If the
+ * container moves or the container model changes (e.g. due to failure or re-balancing), a new instance is created.
  * <p>
- * This is related to {@link ContainerContext} in that they are both associated with the container lifecycle. In order
- * to access this in application code, use {@link Context#getApplicationContainerContext()}. The
- * {@link ContainerContext} is accessible through {@link Context#getContainerContext()}.
+ * Use the {@link ApplicationContainerContextFactory} to create any runtime state and objects, and the
+ * {@link ApplicationContainerContext#start()} and {@link ApplicationContainerContext#stop()} methods to
+ * manage their lifecycle.
  * <p>
- * If it is necessary to have a separate instance per task, then use {@link ApplicationTaskContext} instead.
+ * Use {@link ApplicationTaskContext} to hold unique runtime state and objects for each task within a container.
+ * Use {@link ContainerContext} to access framework-provided context for a container.
  * <p>
- * This class does not need to be {@link java.io.Serializable} and instances are not persisted across deployments.
+ * Unlike its {@link ApplicationContainerContextFactory}, an implementation does not need to be
+ * {@link java.io.Serializable}.
  */
 public interface ApplicationContainerContext {
   /**
-   * Lifecycle logic which will run after tasks in the container are initialized but before processing begins.
+   * Starts this {@link ApplicationContainerContext} before any tasks in the container are initialized and before
+   * processing begins.
    * <p>
-   * If this throws an exception, then the container will fail to start.
+   * If this throws an exception, the container will fail to start.
    */
   void start();
 
   /**
-   * Lifecycle logic which will run after processing ends but before tasks in the container are closed.
+   * Stops this {@link ApplicationContainerContext} after processing ends and after all tasks in the container
+   * are closed.
    * <p>
-   * If this throws an exception, then the container will fail to fully shut down.
+   * If this throws an exception, the container will fail to fully shut down.
    */
   void stop();
 }
index 074b0b4..a8c9f7c 100644 (file)
@@ -22,24 +22,25 @@ import java.io.Serializable;
 
 
 /**
- * An application should implement this if it has a {@link ApplicationContainerContext} that is needed for
- * initialization.
+ * The factory for creating {@link ApplicationContainerContext} instances for a
+ * {@link org.apache.samza.application.SamzaApplication} during container initialization.
  * <p>
- * This will be called to create an instance of {@link ApplicationContainerContext} during the container initialization
- * stage. At that stage, the framework-provided job-level and container-level contexts are available for creating the
- * {@link ApplicationContainerContext}.
+ * Use {@link org.apache.samza.application.descriptors.ApplicationDescriptor#withApplicationContainerContextFactory} to
+ * provide the {@link ApplicationContainerContextFactory}. Use {@link Context#getApplicationContainerContext()} to
+ * get the created {@link ApplicationContainerContext} instance for the current container.
  * <p>
- * This is {@link Serializable} because it is specified in the
- * {@link org.apache.samza.application.descriptors.ApplicationDescriptor}.
- * @param <T> concrete type of {@link ApplicationContainerContext} returned by this factory
+ * The {@link ApplicationContainerContextFactory} implementation must be {@link Serializable}.
+ *
+ * @param <T> concrete type of {@link ApplicationContainerContext} created by this factory
  */
 public interface ApplicationContainerContextFactory<T extends ApplicationContainerContext> extends Serializable {
+
   /**
-   * Create an instance of the application-defined {@link ApplicationContainerContext}.
+   * Creates an instance of the application-defined {@link ApplicationContainerContext}.
    *
-   * @param jobContext framework-provided job context used for building {@link ApplicationContainerContext}
-   * @param containerContext framework-provided container context used for building {@link ApplicationContainerContext}
-   * @return new instance of the application-defined {@link ApplicationContainerContext}
+   * @param jobContext framework-provided job context
+   * @param containerContext framework-provided container context
+   * @return new instance of the application-defined {@link ApplicationContainerContext}
    */
   T create(JobContext jobContext, ContainerContext containerContext);
 }
index 6afbf23..a4236bf 100644 (file)
  */
 package org.apache.samza.context;
 
+
 /**
- * An application should implement this to contain any runtime objects required by processing logic which cannot be
- * shared across tasks. A new instance of this will be created for each task.
+ * An {@link ApplicationTaskContext} instance can be used for holding per-task runtime state and objects and managing
+ * their lifecycle in an {@link org.apache.samza.application.SamzaApplication}
  * <p>
- * This needs to be created by an implementation of {@link ApplicationTaskContextFactory}. The factory should create
- * the runtime objects contained within this context.
+ * Use {@link org.apache.samza.application.descriptors.ApplicationDescriptor#withApplicationTaskContextFactory}
+ * to provide the {@link ApplicationTaskContextFactory}. Use {@link Context#getApplicationTaskContext()} to get
+ * the created {@link ApplicationTaskContext} instance for the current task.
  * <p>
- * This is related to {@link TaskContext} in that they are both associated with a task lifecycle. In order to access
- * this in application code, use {@link Context#getApplicationTaskContext()}. The {@link TaskContext} is accessible
- * through {@link Context#getTaskContext()}.
+ * A unique instance of {@link ApplicationTaskContext} is created for each task in a container.
+ * Use the {@link ApplicationTaskContextFactory} to create any runtime state and objects, and the
+ * {@link ApplicationTaskContext#start()} and {@link ApplicationTaskContext#stop()} methods to manage their lifecycle.
  * <p>
- * If it is possible to share an instance of this across tasks in a container, then use
- * {@link ApplicationContainerContext} instead.
+ * Use {@link ApplicationContainerContext} to hold runtime state and objects shared across all tasks within a container.
+ * Use {@link TaskContext} to access framework-provided context for a task.
  * <p>
- * This class does not need to be {@link java.io.Serializable} and instances are not persisted across deployments.
+ * Unlike its {@link ApplicationTaskContextFactory}, an implementation does not need to be
+ * {@link java.io.Serializable}.
  */
 public interface ApplicationTaskContext {
+
   /**
-   * Lifecycle logic which will run after tasks are initialized but before processing begins.
+   * Starts this {@link ApplicationTaskContext} after its task is initialized but before any messages are processed.
    * <p>
-   * If this throws an exception, then the container will fail to start.
+   * If this throws an exception, the container will fail to start.
    */
   void start();
 
   /**
-   * Lifecycle logic which will run after processing ends but before tasks are closed.
+   * Stops this {@link ApplicationTaskContext} after processing ends but before its task is closed.
    * <p>
-   * If this throws an exception, then the container will fail to fully shut down.
+   * If this throws an exception, the container will fail to fully shut down.
    */
   void stop();
 }
index 619bbc7..c00935f 100644 (file)
@@ -22,27 +22,27 @@ import java.io.Serializable;
 
 
 /**
- * An application should implement this if it has a {@link ApplicationTaskContext} that is needed for
- * initialization. This will be used to create instance(s) of that {@link ApplicationTaskContext}.
+ * The factory for creating {@link ApplicationTaskContext} instances for a
+ * {@link org.apache.samza.application.SamzaApplication}during task initialization.
  * <p>
- * This will be called to create an instance of {@link ApplicationTaskContext} during the initialization stage of each
- * task. At that stage, the framework-provided job-level, container-level, and task-level contexts are available for
- * creating the {@link ApplicationTaskContext}. Also, the application-defined container-level context is available.
+ * Use {@link org.apache.samza.application.descriptors.ApplicationDescriptor#withApplicationTaskContextFactory} to
+ * provide the {@link ApplicationTaskContextFactory}. Use {@link Context#getApplicationTaskContext()} to
+ * get the created {@link ApplicationTaskContext} instance for the current task.
  * <p>
- * This is {@link Serializable} because it is specified in the
- * {@link org.apache.samza.application.descriptors.ApplicationDescriptor}.
- * @param <T> concrete type of {@link ApplicationTaskContext} returned by this factory
+ * The {@link ApplicationTaskContextFactory} implementation must be {@link Serializable}.
+ *
+ * @param <T> concrete type of {@link ApplicationTaskContext} created by this factory
  */
 public interface ApplicationTaskContextFactory<T extends ApplicationTaskContext> extends Serializable {
+
   /**
-   * Create an instance of the application-defined {@link ApplicationTaskContext}.
+   * Creates an instance of the application-defined {@link ApplicationTaskContext}.
    *
-   * @param jobContext framework-provided job context used for building {@link ApplicationTaskContext}
-   * @param containerContext framework-provided container context used for building {@link ApplicationTaskContext}
-   * @param taskContext framework-provided task context used for building {@link ApplicationTaskContext}
-   * @param applicationContainerContext application-provided container context used for building
-   * {@link ApplicationTaskContext}
-   * @return new instance of the application-defined {@link ApplicationContainerContext}
+   * @param jobContext framework-provided job context
+   * @param containerContext framework-provided container context
+   * @param taskContext framework-provided task context
+   * @param applicationContainerContext application-defined container context
+   * @return a new instance of the application-defined {@link ApplicationTaskContext}
    */
   T create(JobContext jobContext, ContainerContext containerContext, TaskContext taskContext,
       ApplicationContainerContext applicationContainerContext);
index 51d7918..97c5f53 100644 (file)
@@ -23,24 +23,25 @@ import org.apache.samza.metrics.MetricsRegistry;
 
 
 /**
- * Contains information at container granularity, provided by the Samza framework, to be used to instantiate an
- * application at runtime.
+ * The framework-provided context for the current container.
  * <p>
- * Note that application-defined container-level context is accessible through
- * {@link ApplicationContainerContext}.
+ * Use {@link ApplicationContainerContext} for the application-defined context for the current container.
  */
 public interface ContainerContext {
+
   /**
-   * Returns the {@link ContainerModel} associated with this container. This contains information like the id and the
-   * associated {@link org.apache.samza.job.model.TaskModel}s.
-   * @return {@link ContainerModel} associated with this container
+   * Gets the {@link ContainerModel} for this container, which contains this container's id and
+   * its {@link org.apache.samza.job.model.TaskModel}.
+   *
+   * @return the {@link ContainerModel} for this container
    */
   ContainerModel getContainerModel();
 
   /**
-   * Returns the {@link MetricsRegistry} for this container. Metrics built using this registry will be associated with
-   * the container.
-   * @return {@link MetricsRegistry} for this container
+   * Gets the {@link MetricsRegistry} for this container, which can be used to register metrics that are
+   * reported per container.
+   *
+   * @return the {@link MetricsRegistry} for this container
    */
   MetricsRegistry getContainerMetricsRegistry();
 }
index bfe66d3..e111127 100644 (file)
 package org.apache.samza.context;
 
 /**
- * Container object for all context provided to instantiate an application at runtime.
+ * A holder for all framework and application defined contexts at runtime.
  */
 public interface Context {
   /**
-   * Returns the framework-provided context for the overall job that is being run.
-   * @return framework-provided job context
+   * Gets the framework-provided context for the job.
+   *
+   * @return the framework-provided job context
    */
   JobContext getJobContext();
 
   /**
-   * Returns the framework-provided context for the container that this is in.
+   * Gets the framework-provided context for the current container. This context is shared by all tasks within
+   * the container.
    * <p>
-   * Note that this is not the application-defined container context. Use
-   * {@link Context#getApplicationContainerContext()} to get the application-defined container context.
-   * @return framework-provided container context
+   * Use {@link #getApplicationContainerContext()} to get the application-defined container context.
+   *
+   * @return the framework-provided container context
    */
   ContainerContext getContainerContext();
 
   /**
-   * Returns the framework-provided context for the task that that this is in.
+   * Gets the framework-provided context for the current task.
    * <p>
-   * Note that this is not the application-defined task context. Use {@link Context#getApplicationTaskContext()}
-   * to get the application-defined task context.
-   * @return framework-provided task context
+   * Use {@link #getApplicationTaskContext()} to get the application-defined task context.
+   *
+   * @return the framework-provided task context
    */
   TaskContext getTaskContext();
 
   /**
-   * Returns the application-defined container context object specified by the
-   * {@link ApplicationContainerContextFactory}. This is shared across all tasks in the container, but not across
-   * containers.
+   * Gets the application-defined context for the current container. This context is shared by all tasks within
+   * the container.
    * <p>
-   * In order to use this in application code, it should be casted to the concrete type that corresponds to the
-   * {@link ApplicationContainerContextFactory}.
+   * Use {@link org.apache.samza.application.descriptors.ApplicationDescriptor#withApplicationContainerContextFactory}
+   * to provide a factory for this context. Cast the returned context to the concrete implementation type to use it.
    * <p>
-   * Note that this is not the framework-provided container context. Use {@link Context#getContainerContext()} to get
-   * the framework-provided container context.
-   * @return application-defined container context
-   * @throws IllegalStateException if no context could be built (e.g. no factory provided)
+   * Use {@link #getContainerContext()} to get the framework-provided container context.
+   *
+   * @return the application-defined container context
+   * @throws IllegalStateException if no {@link ApplicationContainerContextFactory} was was provided for the application
    */
   ApplicationContainerContext getApplicationContainerContext();
 
   /**
-   * Returns the application-defined task context object specified by the {@link ApplicationTaskContextFactory}.
-   * Each task will have a separate instance of this.
+   * Gets the application-defined task context for the current task. This context is unique to this task.
    * <p>
-   * In order to use this in application code, it should be casted to the concrete type that corresponds to the
-   * {@link ApplicationTaskContextFactory}.
+   * Use {@link org.apache.samza.application.descriptors.ApplicationDescriptor#withApplicationTaskContextFactory}
+   * to provide a factory for this context. Cast the returned context to the concrete implementation type to use it.
    * <p>
-   * Note that this is not the framework-provided task context. Use {@link Context#getTaskContext()} to get the
-   * framework-provided task context.
-   * @return application-defined task context
-   * @throws IllegalStateException if no context could be built (e.g. no factory provided)
+   * Use {@link Context#getTaskContext()} to get the framework-provided task context.
+   *
+   * @return the application-defined task context
+   * @throws IllegalStateException if no {@link ApplicationTaskContextFactory} was provided for the application
    */
   ApplicationTaskContext getApplicationTaskContext();
 }
index 239a011..8e41980 100644 (file)
@@ -22,26 +22,28 @@ import org.apache.samza.config.Config;
 
 
 /**
- * Contains information at job granularity, provided by the Samza framework, to be used to instantiate an application at
- * runtime.
+ * The framework-provided context for the job.
  */
 public interface JobContext {
+
   /**
-   * Returns the final configuration for this job.
-   * @return configuration for this job
+   * Gets the final configuration for this job.
+   *
+   * @return the configuration for this job
    */
   Config getConfig();
 
   /**
-   * Returns the name of the job.
-   * @return name of the job
-   * @throws org.apache.samza.SamzaException if the job name was not configured
+   * Gets the name of the job.
+   *
+   * @return the name of this job
    */
   String getJobName();
 
   /**
-   * Returns the instance id for this instance of this job.
-   * @return instance id for the job
+   * Gets the id for this job.
+   *
+   * @return the id for this job
    */
   String getJobId();
 }
index d29f6a5..3a5333c 100644 (file)
@@ -28,54 +28,65 @@ import org.apache.samza.table.Table;
 
 
 /**
- * Contains information at task granularity, provided by the Samza framework, to be used to instantiate an application
- * at runtime.
+ * The framework-provided context for the current task.
  * <p>
- * Note that application-defined task-level context is accessible through {@link ApplicationTaskContext}.
+ * Use {@link ApplicationTaskContext} for the application-defined context for the current task.
  */
 public interface TaskContext {
+
   /**
-   * Returns the {@link TaskModel} associated with this task. This contains information like the task name and
-   * associated {@link SystemStreamPartition}s.
-   * @return {@link TaskModel} associated with this task
+   * Gets the {@link TaskModel} for this task, which contains this task's name and its {@link SystemStreamPartition}s.
+   *
+   * @return the {@link TaskModel} for this task
    */
   TaskModel getTaskModel();
 
   /**
-   * Returns the {@link MetricsRegistry} for this task. Metrics built using this registry will be associated with the
-   * task.
-   * @return {@link MetricsRegistry} for this task
+   * Gets the {@link MetricsRegistry} for this task, which can be used to register metrics that are reported per task.
+   *
+   * @return the {@link MetricsRegistry} for this task
    */
   MetricsRegistry getTaskMetricsRegistry();
 
   /**
-   * Returns the {@link KeyValueStore} corresponding to the {@code storeName}. In application code, it is recommended to
-   * cast the resulting stores to {@link KeyValueStore}s with the correct concrete type parameters.
-   * @param storeName name of the {@link KeyValueStore} to get
-   * @return {@link KeyValueStore} corresponding to the {@code storeName}
+   * Gets the {@link KeyValueStore} associated with {@code storeName} for this task.
+   * <p>
+   * The returned store should be cast with the concrete type parameters based on the configured store serdes.
+   * E.g., if using string key and integer value serde, it should be cast to a {@code KeyValueStore<String, Integer>}.
+   *
+   * @param storeName name of the {@link KeyValueStore} to get for this task
+   * @return the {@link KeyValueStore} associated with {@code storeName} for this task
    * @throws IllegalArgumentException if there is no store associated with {@code storeName}
    */
   KeyValueStore<?, ?> getStore(String storeName);
 
   /**
-   * Returns the {@link Table} corresponding to the {@code tableId}. In application code, it is recommended to cast this
-   * to the resulting tables to {@link Table}s with the correct concrete type parameters.
+   * Gets the {@link Table} corresponding to the {@code tableId} for this task.
+   *
+   * The returned table should be cast with the concrete type parameters based on the configured table serdes, and
+   * whether it is {@link org.apache.samza.table.ReadWriteTable} or {@link org.apache.samza.table.ReadableTable}.
+   * E.g., if using string key and integer value serde for a writable table, it should be cast to a
+   * {@code ReadWriteTable<String, Integer>}.
+   *
    * @param tableId id of the {@link Table} to get
-   * @return {@link Table} corresponding to the {@code tableId}
+   * @return the {@link Table} associated with {@code tableId} for this task
    * @throws IllegalArgumentException if there is no table associated with {@code tableId}
    */
   Table<?> getTable(String tableId);
 
   /**
-   * Returns a task-level {@link CallbackScheduler} which can be used to delay execution of some logic.
-   * @return {@link CallbackScheduler} for this task
+   * Gets the {@link CallbackScheduler} for this task, which can be used to schedule a callback to be executed
+   * at a future time.
+   *
+   * @return the {@link CallbackScheduler} for this task
    */
   CallbackScheduler getCallbackScheduler();
 
   /**
-   * Set the starting offset for the given {@link SystemStreamPartition}. Offsets can only be set for a
-   * {@link SystemStreamPartition} assigned to this task. The {@link SystemStreamPartition}s assigned to this task can
-   * be accessed through {@link TaskModel#getSystemStreamPartitions()} for the {@link TaskModel} obtained by calling
+   * Sets the starting offset for the given {@link SystemStreamPartition}.
+   * <p> Offsets can only be set for a {@link SystemStreamPartition} assigned to this task.
+   * The {@link SystemStreamPartition}s assigned to this task can be accessed through
+   * {@link TaskModel#getSystemStreamPartitions()} for the {@link TaskModel} obtained by calling
    * {@link #getTaskModel()}. Trying to set the offset for any other partition will have no effect.
    *
    * NOTE: this feature is experimental, and the API may change in a future release.
index aa5c8d2..e08fa09 100644 (file)
@@ -21,16 +21,16 @@ package org.apache.samza.system.descriptors;
 import org.apache.samza.serializers.Serde;
 
 /**
- * A descriptor for a generic input stream.
+ * A {@link GenericInputDescriptor} can be used for specifying Samza and system-specific properties of
+ * input streams.
  * <p>
- * An instance of this descriptor may be obtained from an appropriately configured
- * {@link GenericSystemDescriptor}.
+ * If the system provides its own system and stream descriptor implementations, use them instead.
+ * Otherwise, use this {@link GenericInputDescriptor} to specify Samza-specific properties of the stream,
+ * and {@link #withStreamConfigs} to specify additional system specific properties.
  * <p>
- * If the system being used provides its own system and stream descriptor implementations, they should be used instead.
- * Otherwise, this {@link GenericInputDescriptor} may be used to provide Samza-specific properties of the input stream.
- * Additional system stream specific properties may be provided using {@link #withStreamConfigs}
+ * Use {@link GenericSystemDescriptor#getInputDescriptor} to obtain an instance of this descriptor.
  * <p>
- * Stream properties provided in configuration override corresponding properties configured using a descriptor.
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
  *
  * @param <StreamMessageType> type of messages in this stream.
  */
index 1d81525..7302c35 100644 (file)
@@ -21,16 +21,16 @@ package org.apache.samza.system.descriptors;
 import org.apache.samza.serializers.Serde;
 
 /**
- * A descriptor for a generic output stream.
+ * A {@link GenericOutputDescriptor} can be used for specifying Samza and system-specific properties of
+ * output streams.
  * <p>
- * An instance of this descriptor may be obtained from an appropriately configured
- * {@link GenericSystemDescriptor}.
+ * If the system provides its own system and stream descriptor implementations, use them instead.
+ * Otherwise, use this {@link GenericOutputDescriptor} to specify Samza-specific properties of the stream,
+ * and {@link #withStreamConfigs} to specify additional system specific properties.
  * <p>
- * If the system being used provides its own system and stream descriptor implementations, they should be used instead.
- * Otherwise, this {@link GenericOutputDescriptor} may be used to provide Samza-specific properties of the output stream.
- * Additional system stream specific properties may be provided using {@link #withStreamConfigs}
+ * Use {@link GenericSystemDescriptor#getOutputDescriptor} to obtain an instance of this descriptor.
  * <p>
- * Stream properties provided in configuration override corresponding properties configured using a descriptor.
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
  *
  * @param <StreamMessageType> type of messages in this stream.
  */
index eb86877..4884cd6 100644 (file)
@@ -22,13 +22,16 @@ package org.apache.samza.system.descriptors;
 import org.apache.samza.serializers.Serde;
 
 /**
- * A descriptor for a generic system.
+ * A {@link GenericSystemDescriptor} can be used for specifying Samza and system-specific properties of an
+ * input/output system. It can also be used for obtaining {@link GenericInputDescriptor}s and
+ * {@link GenericOutputDescriptor}s, which can be used for specifying any Samza and system-specific properties
+ * of input/output streams.
  * <p>
- * If the system being used provides its own system and stream descriptor implementations, they should be used instead.
- * Otherwise, this {@link GenericSystemDescriptor} may be used to provide Samza-specific properties of the system.
- * Additional system specific properties may be provided using {@link #withSystemConfigs}
+ * If the system provides its own system and stream descriptor implementations, use them instead.
+ * Otherwise, use this {@link GenericSystemDescriptor} to specify Samza-specific properties of the system,
+ * and {@link #withSystemConfigs} to specify additional system specific properties.
  * <p>
- * System properties provided in configuration override corresponding properties configured using a descriptor.
+ * System properties provided in configuration override corresponding properties specified using a descriptor.
  */
 public final class GenericSystemDescriptor extends SystemDescriptor<GenericSystemDescriptor>
     implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
index fd7a50c..2c6f88b 100644 (file)
@@ -26,9 +26,13 @@ import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.SystemStreamMetadata.OffsetType;
 
 /**
- * The base descriptor for an input stream. Allows setting properties that are common to all input streams.
+ * An {@link InputDescriptor} can be used for specifying Samza and system-specific properties of input streams.
  * <p>
- * Stream properties provided in configuration override corresponding properties configured using a descriptor.
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
+ * <p>
+ * This is the base descriptor for an input stream. Use a system-specific input descriptor (e.g. KafkaInputDescriptor)
+ * obtained from its system descriptor (e.g. KafkaSystemDescriptor) if one is available. Otherwise use the
+ * {@link GenericInputDescriptor} obtained from a {@link GenericSystemDescriptor}.
  *
  * @param <StreamMessageType> type of messages in this stream.
  * @param <SubClass> type of the concrete sub-class
index 898be1e..264c6da 100644 (file)
@@ -21,9 +21,13 @@ package org.apache.samza.system.descriptors;
 import org.apache.samza.serializers.Serde;
 
 /**
- * The base descriptor for an output stream. Allows setting properties that are common to all output streams.
+ * An {@link OutputDescriptor} can be used for specifying Samza and system-specific properties of output streams.
  * <p>
- * Stream properties provided in configuration override corresponding properties configured using a descriptor.
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
+ * <p>
+ * This is the base descriptor for an output stream. Use a system-specific input descriptor (e.g. KafkaOutputDescriptor)
+ * obtained from its system descriptor (e.g. KafkaSystemDescriptor) if one is available. Otherwise use the
+ * {@link GenericOutputDescriptor} obtained from a {@link GenericSystemDescriptor}.
  *
  * @param <StreamMessageType> type of messages in this stream.
  * @param <SubClass> type of the concrete sub-class
index e8e586f..43cab8f 100644 (file)
@@ -29,9 +29,14 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.serializers.Serde;
 
 /**
- * The base descriptor for an input or output stream. Allows setting properties that are common to all streams.
+ * A {@link StreamDescriptor} can be used for specifying Samza and system-specific properties of input/output streams.
  * <p>
- * Stream properties provided in configuration override corresponding properties configured using a descriptors.
+ * Stream properties provided in configuration override corresponding properties specified using a descriptors.
+ * <p>
+ * This is the base descriptor for an input/output stream. Use a system-specific input/output descriptor
+ * (e.g. KafkaInputDescriptor) obtained from its system descriptor (e.g. KafkaSystemDescriptor) if one is available.
+ * Otherwise use the {@link GenericInputDescriptor} and {@link GenericOutputDescriptor} obtained from a
+ * {@link GenericSystemDescriptor}.
  *
  * @param <StreamMessageType> type of messages in this stream.
  * @param <SubClass> type of the concrete sub-class
index 9db2544..813deb1 100644 (file)
@@ -30,9 +30,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The base descriptor for a system. Allows setting properties that are common to all systems.
+ * A {@link SystemDescriptor} can be used for specifying Samza and system-specific properties of an input/output system.
+ * It can also be used for obtaining {@link InputDescriptor}s and {@link OutputDescriptor}s, which can be used for
+ * specifying Samza and system-specific properties of input/output streams.
  * <p>
- * System properties provided in configuration override corresponding properties configured using a descriptor.
+ * System properties provided in configuration override corresponding properties specified using a descriptor.
+ * <p>
+ * This is the base descriptor for a system. Use a system-specific descriptor (e.g. KafkaSystemDescriptor) if one
+ * is available. Otherwise use the {@link GenericSystemDescriptor}.
  * <p>
  * Systems may provide an {@link InputTransformer} to be used for input streams on the system. An
  * {@link InputTransformer} transforms an {@code IncomingMessageEnvelope} with deserialized key and message
index 767e176..76ad460 100644 (file)
@@ -20,9 +20,28 @@ package org.apache.samza.table;
 
 import org.apache.samza.annotation.InterfaceStability;
 
+
 /**
  *
- * Marker interface for a table.
+ * A {@link Table} is an abstraction for data sources that support random access by key. It is an
+ * evolution of the existing {@link org.apache.samza.storage.kv.KeyValueStore} API. It offers support for
+ * both local and remote data sources and composition through hybrid tables. For remote data sources,
+ * a {@code RemoteTable} provides optimized access with caching, rate-limiting, and retry support.
+ * <p>
+ * Depending on the implementation, a {@link Table} can be a {@link ReadableTable} or a {@link ReadWriteTable}.
+ * <p>
+ * Use a {@link org.apache.samza.table.descriptors.TableDescriptor} to specify the properties of a {@link Table}.
+ * For High Level API {@link org.apache.samza.application.StreamApplication}s, use
+ * {@link org.apache.samza.application.descriptors.StreamApplicationDescriptor#getTable} to obtain
+ * the {@link org.apache.samza.table.Table} instance for the descriptor that can be used with the
+ * {@link org.apache.samza.operators.MessageStream} operators like
+ * {@link org.apache.samza.operators.MessageStream#sendTo(Table)}.
+ * Alternatively, use {@link org.apache.samza.context.TaskContext#getTable(String)} in
+ * {@link org.apache.samza.operators.functions.InitableFunction#init} to get the table instance for use within
+ * operator functions.
+ * For Low Level API {@link org.apache.samza.application.TaskApplication}s, use
+ * {@link org.apache.samza.context.TaskContext#getTable(String)} in
+ * {@link org.apache.samza.task.InitableTask#init} to get the table instance for use within the Task.
  *
  * @param <R> the type of records in the table
  */
index 5d7b89e..f1118eb 100644 (file)
@@ -20,25 +20,33 @@ package org.apache.samza.table.descriptors;
 
 import org.apache.samza.annotation.InterfaceStability;
 
+
 /**
- * User facing class to collect metadata that fully describes a
- * Samza table. This interface should be implemented by concrete table implementations.
+ * A {@link TableDescriptor} can be used for specifying Samza and implementation-specific properties of a
+ * {@link org.apache.samza.table.Table}.
  * <p>
- * Typical user code should look like the following, notice <code>withConfig()</code>
- * is defined in this class and the rest in subclasses.
- *
- * <pre>
- * {@code
- * TableDescriptor<Integer, String, ?> tableDesc = new RocksDbTableDescriptor("tbl",
+ * Table properties provided in configuration override corresponding properties specified using a descriptor.
+ * <p>
+ * This is the base descriptor for a table. Use a implementation-specific descriptor (e.g. RocksDBTableDescriptor) to
+ * use it in the application. For example:
+ * <pre>{@code
+ * RocksDbTableDescriptor tableDescriptor = new RocksDbTableDescriptor("table",
  *         KVSerde.of(new IntegerSerde(), new StringSerde("UTF-8")))
  *     .withBlockSize(1024)
  *     .withConfig("some-key", "some-value");
  * }
  * </pre>
-
- * Once constructed, a table descriptor can be registered with the system. Internally,
- * the table descriptor is then converted to a {@link org.apache.samza.table.TableSpec},
- * which is used to track tables internally.
+ * For High Level API {@link org.apache.samza.application.StreamApplication}s, use
+ * {@link org.apache.samza.application.descriptors.StreamApplicationDescriptor#getTable(TableDescriptor)} to obtain
+ * the corresponding {@link org.apache.samza.table.Table} instance that can be used with the
+ * {@link org.apache.samza.operators.MessageStream} operators like
+ * {@link org.apache.samza.operators.MessageStream#sendTo(org.apache.samza.table.Table)}.
+ * Alternatively, use {@link org.apache.samza.context.TaskContext#getTable(String)} in
+ * {@link org.apache.samza.operators.functions.InitableFunction#init} to get the table instance for use within
+ * operator functions.
+ * For Low Level API {@link org.apache.samza.application.TaskApplication}s, use
+ * {@link org.apache.samza.context.TaskContext#getTable(String)} in
+ * {@link org.apache.samza.task.InitableTask#init} to get the table instance for use within the Task.
  *
  * @param <K> the type of the key in this table
  * @param <V> the type of the value in this table
@@ -48,13 +56,14 @@ import org.apache.samza.annotation.InterfaceStability;
 public interface TableDescriptor<K, V, D extends TableDescriptor<K, V, D>> {
 
   /**
-   * Get the Id of the table
-   * @return Id of the table
+   * Get the id of the table
+   * @return id of the table
    */
   String getTableId();
 
   /**
    * Add a configuration entry for the table
+   *
    * @param key the key
    * @param value the value
    * @return this table descriptor instance
index 8443d20..f9349bd 100644 (file)
@@ -23,9 +23,9 @@ import org.apache.samza.annotation.InterfaceStability;
 
 
 /**
- * The interface for all task factories (i.e. {@link StreamTaskFactory} and {@link AsyncStreamTaskFactory}
+ * The base interface for all task factories (i.e. {@link StreamTaskFactory} and {@link AsyncStreamTaskFactory})
  *
- * @param <T> the type of task instances
+ * @param <T> the type of task instances created by the factory
  */
 @InterfaceStability.Stable
 public interface TaskFactory<T> extends Serializable {
index df22269..c8cc36b 100644 (file)
@@ -32,11 +32,12 @@ import org.apache.samza.system.eventhub.EventHubConfig;
 
 
 /**
- * A descriptor for the Event Hubs output stream
+ * A {@link EventHubsInputDescriptor} can be used for specifying Samza and EventHubs-specific properties of EventHubs
+ * input streams.
  * <p>
- * An instance of this descriptor may be obtained from an {@link EventHubsSystemDescriptor}
+ * Use {@link EventHubsSystemDescriptor#getInputDescriptor} to obtain an instance of this descriptor.
  * <p>
- * Stream properties provided in configuration override corresponding properties configured using a descriptor.
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
  *
  * @param <StreamMessageType> type of messages in this stream
  */
index 95f7e42..b3e1c59 100644 (file)
@@ -30,12 +30,14 @@ import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.eventhub.EventHubConfig;
 
+
 /**
- * A descriptor for an Event Hubs output stream
+ * A {@link EventHubsOutputDescriptor} can be used for specifying Samza and EventHubs-specific properties of EventHubs
+ * output streams.
  * <p>
- * An instance of this descriptor may be obtained from and {@link EventHubsSystemDescriptor}
+ * Use {@link EventHubsSystemDescriptor#getOutputDescriptor} to obtain an instance of this descriptor.
  * <p>
- * Stream properties provided in configuration override corresponding properties configured using a descriptor.
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
  *
  * @param <StreamMessageType> type of messages in this stream
  */
index feffd87..2084018 100644 (file)
@@ -32,9 +32,12 @@ import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.Partitio
 
 
 /**
- * A descriptor for a Event Hubs system.
+ * A {@link EventHubsSystemDescriptor} can be used for specifying Samza and EventHubs-specific properties of a EventHubs
+ * input/output system. It can also be used for obtaining {@link EventHubsInputDescriptor}s and
+ * {@link EventHubsOutputDescriptor}s, which can be used for specifying Samza and system-specific properties of
+ * EventHubs input/output streams.
  * <p>
- * System properties provided in configuration override corresponding properties configured using a descriptor.
+ * System properties provided in configuration override corresponding properties specified using a descriptor.
  */
 public class EventHubsSystemDescriptor extends SystemDescriptor<EventHubsSystemDescriptor> {
   private static final String FACTORY_CLASS_NAME = EventHubSystemFactory.class.getName();
index 2b29a2b..e9cc302 100644 (file)
@@ -32,7 +32,7 @@ public final class LegacyTaskApplication implements TaskApplication {
   }
 
   @Override
-  public void describe(TaskApplicationDescriptor appDesc) {
-    appDesc.setTaskFactory(TaskFactoryUtil.getTaskFactory(taskClassName));
+  public void describe(TaskApplicationDescriptor appDescriptor) {
+    appDescriptor.setTaskFactory(TaskFactoryUtil.getTaskFactory(taskClassName));
   }
 }
\ No newline at end of file
index 8b96c8a..6baa54e 100644 (file)
@@ -25,7 +25,7 @@ import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
  */
 public class MockStreamApplication implements StreamApplication {
   @Override
-  public void describe(StreamApplicationDescriptor appSpec) {
+  public void describe(StreamApplicationDescriptor appDescriptor) {
 
   }
 }
index ab91cee..f620ece 100644 (file)
@@ -91,7 +91,7 @@ public class TestApplicationUtil {
    */
   public static class MockTaskApplication implements TaskApplication {
     @Override
-    public void describe(TaskApplicationDescriptor appSpec) {
+    public void describe(TaskApplicationDescriptor appDescriptor) {
 
     }
   }
index 6d017cb..299d631 100644 (file)
@@ -818,7 +818,7 @@ public class TestExecutionPlanner {
   public static class MockTaskApplication implements SamzaApplication {
 
     @Override
-    public void describe(ApplicationDescriptor appDesc) {
+    public void describe(ApplicationDescriptor appDescriptor) {
 
     }
   }
index fb279ab..d9477e5 100644 (file)
@@ -27,12 +27,14 @@ import org.apache.samza.system.descriptors.SystemDescriptor;
 import org.apache.samza.system.descriptors.InputTransformer;
 import org.apache.samza.serializers.Serde;
 
+
 /**
- * A descriptor for a kafka input stream.
+ * A {@link KafkaInputDescriptor} can be used for specifying Samza and Kafka-specific properties of Kafka
+ * input streams.
  * <p>
- * An instance of this descriptor may be obtained from an appropriately configured {@link KafkaSystemDescriptor}.
+ * Use {@link KafkaSystemDescriptor#getInputDescriptor} to obtain an instance of this descriptor.
  * <p>
- * Stream properties provided in configuration override corresponding properties configured using a descriptor.
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
  *
  * @param <StreamMessageType> type of messages in this stream.
  */
index f13352c..dcc15a8 100644 (file)
@@ -22,12 +22,14 @@ import org.apache.samza.system.descriptors.OutputDescriptor;
 import org.apache.samza.system.descriptors.SystemDescriptor;
 import org.apache.samza.serializers.Serde;
 
+
 /**
- * A descriptor for a kafka output stream.
+ * A {@link KafkaOutputDescriptor} can be used for specifying Samza and Kafka-specific properties of Kafka
+ * output streams.
  * <p>
- * An instance of this descriptor may be obtained from an appropriately configured {@link KafkaSystemDescriptor}.
+ * Use {@link KafkaSystemDescriptor#getOutputDescriptor} to obtain an instance of this descriptor.
  * <p>
- * Stream properties provided in configuration override corresponding properties configured using a descriptor.
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
  *
  * @param <StreamMessageType> type of messages in this stream.
  */
index 6fb8c1c..091c21a 100644 (file)
@@ -34,9 +34,12 @@ import org.apache.samza.system.kafka.KafkaSystemFactory;
 
 
 /**
- * A descriptor for a Kafka system.
+ * A {@link KafkaSystemDescriptor} can be used for specifying Samza and Kafka-specific properties of a Kafka
+ * input/output system. It can also be used for obtaining {@link KafkaInputDescriptor}s and
+ * {@link KafkaOutputDescriptor}s, which can be used for specifying Samza and system-specific properties of
+ * Kafka input/output streams.
  * <p>
- * System properties provided in configuration override corresponding properties configured using a descriptor.
+ * System properties provided in configuration override corresponding properties specified using a descriptor.
  */
 @SuppressWarnings("unchecked")
 public class KafkaSystemDescriptor extends SystemDescriptor<KafkaSystemDescriptor>
index 8265414..47d6cf0 100644 (file)
@@ -47,10 +47,10 @@ public class SamzaSqlApplication implements StreamApplication {
   private AtomicInteger queryId = new AtomicInteger(0);
 
   @Override
-  public void describe(StreamApplicationDescriptor appDesc) {
+  public void describe(StreamApplicationDescriptor appDescriptor) {
     try {
       // TODO: Introduce an API to return a dsl string containing one or more sql statements.
-      List<String> dslStmts = SamzaSqlDslConverter.fetchSqlFromConfig(appDesc.getConfig());
+      List<String> dslStmts = SamzaSqlDslConverter.fetchSqlFromConfig(appDescriptor.getConfig());
 
       Map<Integer, TranslatorContext> translatorContextMap = new HashMap<>();
 
@@ -59,21 +59,21 @@ public class SamzaSqlApplication implements StreamApplication {
       Set<String> outputSystemStreams = new HashSet<>();
 
       Collection<RelRoot> relRoots =
-          SamzaSqlApplicationConfig.populateSystemStreamsAndGetRelRoots(dslStmts, appDesc.getConfig(),
+          SamzaSqlApplicationConfig.populateSystemStreamsAndGetRelRoots(dslStmts, appDescriptor.getConfig(),
               inputSystemStreams, outputSystemStreams);
 
       // 2. Populate configs
       SamzaSqlApplicationConfig sqlConfig =
-          new SamzaSqlApplicationConfig(appDesc.getConfig(), inputSystemStreams, outputSystemStreams);
+          new SamzaSqlApplicationConfig(appDescriptor.getConfig(), inputSystemStreams, outputSystemStreams);
 
       // 3. Translate Calcite plan to Samza stream operators
-      QueryTranslator queryTranslator = new QueryTranslator(appDesc, sqlConfig);
+      QueryTranslator queryTranslator = new QueryTranslator(appDescriptor, sqlConfig);
       SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(sqlConfig);
       Map<String, SamzaRelConverter> converters = sqlConfig.getSamzaRelConverters();
       for (RelRoot relRoot : relRoots) {
         LOG.info("Translating relRoot {} to samza stream graph", relRoot);
         int qId = queryId.incrementAndGet();
-        TranslatorContext translatorContext = new TranslatorContext(appDesc, relRoot, executionContext, converters);
+        TranslatorContext translatorContext = new TranslatorContext(appDescriptor, relRoot, executionContext, converters);
         translatorContextMap.put(qId, translatorContext);
         queryTranslator.translate(relRoot, translatorContext, qId);
       }
@@ -85,7 +85,7 @@ public class SamzaSqlApplication implements StreamApplication {
        * container, so it does not need to be serialized. Therefore, the translatorContext is recreated in each container
        * and does not need to be serialized.
        */
-      appDesc.withApplicationTaskContextFactory((jobContext,
+      appDescriptor.withApplicationTaskContextFactory((jobContext,
           containerContext,
           taskContext,
           applicationContainerContext) ->
index ba9c8b3..766b529 100644 (file)
@@ -55,7 +55,7 @@ public class AppWithGlobalConfigExample implements StreamApplication {
   }
 
   @Override
-  public void describe(StreamApplicationDescriptor appDesc) {
+  public void describe(StreamApplicationDescriptor appDescriptor) {
     KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
 
     KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -65,15 +65,15 @@ public class AppWithGlobalConfigExample implements StreamApplication {
         trackingSystem.getOutputDescriptor("pageViewEventPerMember",
             KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));
 
-    appDesc.getInputStream(inputStreamDescriptor)
+    appDescriptor.getInputStream(inputStreamDescriptor)
         .window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), () -> 0, (m, c) -> c + 1,
             null, null)
             .setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
             .setAccumulationMode(AccumulationMode.DISCARDING), "window1")
         .map(m -> KV.of(m.getKey().getKey(), new PageViewCount(m)))
-        .sendTo(appDesc.getOutputStream(outputStreamDescriptor));
+        .sendTo(appDescriptor.getOutputStream(outputStreamDescriptor));
 
-    appDesc.withMetricsReporterFactories(new HashMap<>());
+    appDescriptor.withMetricsReporterFactories(new HashMap<>());
   }
 
   class PageViewEvent {
index 7721d44..bf641ce 100644 (file)
@@ -50,7 +50,7 @@ public class BroadcastExample implements StreamApplication {
   }
 
   @Override
-  public void describe(StreamApplicationDescriptor appDesc) {
+  public void describe(StreamApplicationDescriptor appDescriptor) {
     KVSerde<String, PageViewEvent> serde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class));
     KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
     KafkaInputDescriptor<KV<String, PageViewEvent>> pageViewEvent =
@@ -62,10 +62,10 @@ public class BroadcastExample implements StreamApplication {
     KafkaOutputDescriptor<KV<String, PageViewEvent>> outStream3 =
         trackingSystem.getOutputDescriptor("outStream3", serde);
 
-    MessageStream<KV<String, PageViewEvent>> inputStream = appDesc.getInputStream(pageViewEvent);
-    inputStream.filter(m -> m.key.equals("key1")).sendTo(appDesc.getOutputStream(outStream1));
-    inputStream.filter(m -> m.key.equals("key2")).sendTo(appDesc.getOutputStream(outStream2));
-    inputStream.filter(m -> m.key.equals("key3")).sendTo(appDesc.getOutputStream(outStream3));
+    MessageStream<KV<String, PageViewEvent>> inputStream = appDescriptor.getInputStream(pageViewEvent);
+    inputStream.filter(m -> m.key.equals("key1")).sendTo(appDescriptor.getOutputStream(outStream1));
+    inputStream.filter(m -> m.key.equals("key2")).sendTo(appDescriptor.getOutputStream(outStream2));
+    inputStream.filter(m -> m.key.equals("key3")).sendTo(appDescriptor.getOutputStream(outStream3));
   }
 
   class PageViewEvent {
index 4923b7d..444039a 100644 (file)
@@ -58,7 +58,7 @@ public class KeyValueStoreExample implements StreamApplication {
   }
 
   @Override
-  public void describe(StreamApplicationDescriptor appDesc) {
+  public void describe(StreamApplicationDescriptor appDescriptor) {
     KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
 
     KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -68,9 +68,9 @@ public class KeyValueStoreExample implements StreamApplication {
         trackingSystem.getOutputDescriptor("pageViewEventPerMember",
             KVSerde.of(new StringSerde(), new JsonSerdeV2<>(StatsOutput.class)));
 
-    appDesc.withDefaultSystem(trackingSystem);
-    MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor);
-    OutputStream<KV<String, StatsOutput>> pageViewEventPerMember = appDesc.getOutputStream(outputStreamDescriptor);
+    appDescriptor.withDefaultSystem(trackingSystem);
+    MessageStream<PageViewEvent> pageViewEvents = appDescriptor.getInputStream(inputStreamDescriptor);
+    OutputStream<KV<String, StatsOutput>> pageViewEventPerMember = appDescriptor.getOutputStream(outputStreamDescriptor);
 
     pageViewEvents
         .partitionBy(pve -> pve.memberId, pve -> pve,
index ac0db36..e3eee23 100644 (file)
@@ -49,7 +49,7 @@ public class MergeExample implements StreamApplication {
   }
 
   @Override
-  public void describe(StreamApplicationDescriptor appDesc) {
+  public void describe(StreamApplicationDescriptor appDescriptor) {
     KVSerde<String, PageViewEvent> serde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class));
     KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
 
@@ -64,8 +64,8 @@ public class MergeExample implements StreamApplication {
         trackingSystem.getOutputDescriptor("mergedStream", serde);
 
     MessageStream
-        .mergeAll(ImmutableList.of(appDesc.getInputStream(isd1), appDesc.getInputStream(isd2), appDesc.getInputStream(isd3)))
-        .sendTo(appDesc.getOutputStream(osd));
+        .mergeAll(ImmutableList.of(appDescriptor.getInputStream(isd1), appDescriptor.getInputStream(isd2), appDescriptor.getInputStream(isd3)))
+        .sendTo(appDescriptor.getOutputStream(osd));
   }
 
   class PageViewEvent {
index ea38984..54cced1 100644 (file)
@@ -50,7 +50,7 @@ public class OrderShipmentJoinExample implements StreamApplication {
   }
 
   @Override
-  public void describe(StreamApplicationDescriptor appDesc) {
+  public void describe(StreamApplicationDescriptor appDescriptor) {
     KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
 
     KafkaInputDescriptor<OrderRecord> orderStreamDescriptor =
@@ -61,12 +61,12 @@ public class OrderShipmentJoinExample implements StreamApplication {
         trackingSystem.getOutputDescriptor("fulfilledOrders",
             KVSerde.of(new StringSerde(), new JsonSerdeV2<>(FulfilledOrderRecord.class)));
 
-    appDesc.getInputStream(orderStreamDescriptor)
-        .join(appDesc.getInputStream(shipmentStreamDescriptor), new MyJoinFunction(),
+    appDescriptor.getInputStream(orderStreamDescriptor)
+        .join(appDescriptor.getInputStream(shipmentStreamDescriptor), new MyJoinFunction(),
             new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class),
             Duration.ofMinutes(1), "join")
         .map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder))
-        .sendTo(appDesc.getOutputStream(fulfilledOrdersStreamDescriptor));
+        .sendTo(appDescriptor.getOutputStream(fulfilledOrdersStreamDescriptor));
 
   }
 
index 1476c81..5fe7b9c 100644 (file)
@@ -58,7 +58,7 @@ public class PageViewCounterExample implements StreamApplication {
   }
 
   @Override
-  public void describe(StreamApplicationDescriptor appDesc) {
+  public void describe(StreamApplicationDescriptor appDescriptor) {
     KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
 
     KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -68,8 +68,8 @@ public class PageViewCounterExample implements StreamApplication {
         trackingSystem.getOutputDescriptor("pageViewEventPerMember",
             KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));
 
-    MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor);
-    OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream = appDesc.getOutputStream(outputStreamDescriptor);
+    MessageStream<PageViewEvent> pageViewEvents = appDescriptor.getInputStream(inputStreamDescriptor);
+    OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream = appDescriptor.getOutputStream(outputStreamDescriptor);
 
     SupplierFunction<Integer> initialValue = () -> 0;
     FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;
index 2cf3ac3..19403b0 100644 (file)
@@ -54,7 +54,7 @@ public class RepartitionExample implements StreamApplication {
   }
 
   @Override
-  public void describe(StreamApplicationDescriptor appDesc) {
+  public void describe(StreamApplicationDescriptor appDescriptor) {
     KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
 
     KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -64,9 +64,9 @@ public class RepartitionExample implements StreamApplication {
         trackingSystem.getOutputDescriptor("pageViewEventPerMember",
             KVSerde.of(new StringSerde(), new JsonSerdeV2<>(MyStreamOutput.class)));
 
-    appDesc.withDefaultSystem(trackingSystem);
-    MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor);
-    OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember = appDesc.getOutputStream(outputStreamDescriptor);
+    appDescriptor.withDefaultSystem(trackingSystem);
+    MessageStream<PageViewEvent> pageViewEvents = appDescriptor.getInputStream(inputStreamDescriptor);
+    OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember = appDescriptor.getOutputStream(outputStreamDescriptor);
 
     pageViewEvents
         .partitionBy(pve -> pve.memberId, pve -> pve,
index 8f6c6f8..44528e6 100644 (file)
@@ -60,18 +60,18 @@ public class TaskApplicationExample implements TaskApplication {
   }
 
   @Override
-  public void describe(TaskApplicationDescriptor appDesc) {
+  public void describe(TaskApplicationDescriptor appDescriptor) {
     // add input and output streams
     KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("tracking");
     KafkaInputDescriptor<String> isd = ksd.getInputDescriptor("myinput", new StringSerde());
     KafkaOutputDescriptor<String> osd = ksd.getOutputDescriptor("myout", new StringSerde());
     TableDescriptor td = new RocksDbTableDescriptor("mytable");
 
-    appDesc.addInputStream(isd);
-    appDesc.addOutputStream(osd);
-    appDesc.addTable(td);
+    appDescriptor.addInputStream(isd);
+    appDescriptor.addOutputStream(osd);
+    appDescriptor.addTable(td);
     // create the task factory based on configuration
-    appDesc.setTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
+    appDescriptor.setTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
   }
 
 }
\ No newline at end of file
index 51089f7..426fd8d 100644 (file)
@@ -57,7 +57,7 @@ public class WindowExample implements StreamApplication {
   }
 
   @Override
-  public void describe(StreamApplicationDescriptor appDesc) {
+  public void describe(StreamApplicationDescriptor appDescriptor) {
     KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
 
     KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
@@ -67,8 +67,8 @@ public class WindowExample implements StreamApplication {
 
     SupplierFunction<Integer> initialValue = () -> 0;
     FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 : c + 1;
-    MessageStream<PageViewEvent> inputStream = appDesc.getInputStream(inputStreamDescriptor);
-    OutputStream<Integer> outputStream = appDesc.getOutputStream(outputStreamDescriptor);
+    MessageStream<PageViewEvent> inputStream = appDescriptor.getInputStream(inputStreamDescriptor);
+    OutputStream<Integer> outputStream = appDescriptor.getOutputStream(outputStreamDescriptor);
 
     // create a tumbling window that outputs the number of message collected every 10 minutes.
     // also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive
index 2e51f6a..2002ce6 100644 (file)
@@ -37,9 +37,9 @@ public class TestStandaloneIntegrationApplication implements StreamApplication {
   private static final Logger LOGGER = LoggerFactory.getLogger(TestStandaloneIntegrationApplication.class);
 
   @Override
-  public void describe(StreamApplicationDescriptor appDesc) {
+  public void describe(StreamApplicationDescriptor appDescriptor) {
     String systemName = "testSystemName";
-    String inputStreamName = appDesc.getConfig().get("input.stream.name");
+    String inputStreamName = appDescriptor.getConfig().get("input.stream.name");
     String outputStreamName = "standaloneIntegrationTestKafkaOutputTopic";
     LOGGER.info("Publishing message from: {} to: {}.", inputStreamName, outputStreamName);
     KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(systemName);
@@ -49,6 +49,6 @@ public class TestStandaloneIntegrationApplication implements StreamApplication {
         kafkaSystemDescriptor.getInputDescriptor(inputStreamName, noOpSerde);
     KafkaOutputDescriptor<KV<Object, Object>> osd =
         kafkaSystemDescriptor.getOutputDescriptor(inputStreamName, noOpSerde);
-    appDesc.getInputStream(isd).sendTo(appDesc.getOutputStream(osd));
+    appDescriptor.getInputStream(isd).sendTo(appDescriptor.getOutputStream(osd));
   }
 }
index 672837b..6f381e2 100644 (file)
@@ -96,11 +96,11 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness {
     class PipelineApplication implements StreamApplication {
 
       @Override
-      public void describe(StreamApplicationDescriptor appDesc) {
+      public void describe(StreamApplicationDescriptor appDescriptor) {
         DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor("test");
         GenericInputDescriptor<KV<String, PageView>> isd =
             sd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
-        appDesc.getInputStream(isd)
+        appDescriptor.getInputStream(isd)
             .map(KV::getValue)
             .partitionBy(pv -> pv.getMemberId(), pv -> pv, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
             .sink((m, collector, coordinator) -> {
index 8431f57..74c32b4 100644 (file)
@@ -151,11 +151,11 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness {
     class TestStreamApp implements StreamApplication {
 
       @Override
-      public void describe(StreamApplicationDescriptor appDesc) {
+      public void describe(StreamApplicationDescriptor appDescriptor) {
         DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor("test");
         GenericInputDescriptor<KV<String, PageView>> isd =
             sd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
-        appDesc.getInputStream(isd)
+        appDescriptor.getInputStream(isd)
             .map(KV::getValue)
             .partitionBy(pv -> pv.getMemberId(), pv -> pv, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
             .sink((m, collector, coordinator) -> {
index ef17a22..28d790e 100644 (file)
@@ -35,14 +35,14 @@ public class BroadcastAssertApp implements StreamApplication {
 
 
   @Override
-  public void describe(StreamApplicationDescriptor appDesc) {
-    Config config = appDesc.getConfig();
+  public void describe(StreamApplicationDescriptor appDescriptor) {
+    Config config = appDescriptor.getConfig();
     String inputTopic = config.get(INPUT_TOPIC_NAME_PROP);
 
     final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
     KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
     KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(inputTopic, serde);
-    final MessageStream<PageView> broadcastPageViews = appDesc
+    final MessageStream<PageView> broadcastPageViews = appDescriptor
         .getInputStream(isd)
         .broadcast(serde, "pv");
 
index 649c032..eca62d0 100644 (file)
@@ -92,15 +92,15 @@ public class FaultInjectionTest extends StreamApplicationIntegrationTestHarness
     private static transient CountDownLatch containerShutdownLatch;
 
     @Override
-    public void describe(TaskApplicationDescriptor appDesc) {
-      Config config = appDesc.getConfig();
+    public void describe(TaskApplicationDescriptor appDescriptor) {
+      Config config = appDescriptor.getConfig();
       String inputTopic = config.get(INPUT_TOPIC_NAME_PROP);
 
       final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
       KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
       KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(inputTopic, serde);
-      appDesc.addInputStream(isd);
-      appDesc.setTaskFactory((StreamTaskFactory) () -> new FaultInjectionTask(containerShutdownLatch));
+      appDescriptor.addInputStream(isd);
+      appDescriptor.setTaskFactory((StreamTaskFactory) () -> new FaultInjectionTask(containerShutdownLatch));
     }
 
     private static class FaultInjectionTask implements StreamTask, ClosableTask {
index a442140..476c0dc 100644 (file)
@@ -129,20 +129,20 @@ public class StreamApplicationIntegrationTest {
 
   private static class PageViewProfileViewJoinApplication implements StreamApplication {
     @Override
-    public void describe(StreamApplicationDescriptor appDesc) {
-      Table<KV<Integer, TestTableData.Profile>> table = appDesc.getTable(
+    public void describe(StreamApplicationDescriptor appDescriptor) {
+      Table<KV<Integer, TestTableData.Profile>> table = appDescriptor.getTable(
           new RocksDbTableDescriptor<Integer, TestTableData.Profile>("profile-view-store",
               KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde())));
 
       KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
       KafkaInputDescriptor<TestTableData.Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
-      appDesc.getInputStream(profileISD).map(m -> new KV(m.getMemberId(), m)).sendTo(table);
+      appDescriptor.getInputStream(profileISD).map(m -> new KV(m.getMemberId(), m)).sendTo(table);
 
       KafkaInputDescriptor<TestTableData.PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
       KafkaOutputDescriptor<TestTableData.EnrichedPageView> enrichedPageViewOSD =
           ksd.getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());
-      OutputStream<TestTableData.EnrichedPageView> outputStream = appDesc.getOutputStream(enrichedPageViewOSD);
-      appDesc.getInputStream(pageViewISD)
+      OutputStream<TestTableData.EnrichedPageView> outputStream = appDescriptor.getOutputStream(enrichedPageViewOSD);
+      appDescriptor.getInputStream(pageViewISD)
           .partitionBy(TestTableData.PageView::getMemberId,  pv -> pv, KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>(
               TestTableData.PageView.class)), "p1")
           .join(table, new PageViewToProfileJoinFunction())
@@ -152,22 +152,22 @@ public class StreamApplicationIntegrationTest {
 
   private static class PageViewFilterApplication implements StreamApplication {
     @Override
-    public void describe(StreamApplicationDescriptor appDesc) {
+    public void describe(StreamApplicationDescriptor appDescriptor) {
       KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
       KafkaInputDescriptor<KV<String, PageView>> isd =
           ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
-      MessageStream<KV<String, TestData.PageView>> inputStream = appDesc.getInputStream(isd);
+      MessageStream<KV<String, TestData.PageView>> inputStream = appDescriptor.getInputStream(isd);
       inputStream.map(KV::getValue).filter(pv -> pv.getPageKey().equals("inbox"));
     }
   }
 
   private static class PageViewRepartitionApplication implements StreamApplication {
     @Override
-    public void describe(StreamApplicationDescriptor appDesc) {
+    public void describe(StreamApplicationDescriptor appDescriptor) {
       KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
       KafkaInputDescriptor<KV<String, PageView>> isd =
           ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
-      MessageStream<KV<String, TestData.PageView>> inputStream = appDesc.getInputStream(isd);
+      MessageStream<KV<String, TestData.PageView>> inputStream = appDescriptor.getInputStream(isd);
       inputStream
           .map(KV::getValue)
           .partitionBy(PageView::getMemberId, pv -> pv, KVSerde.of(new IntegerSerde(), new JsonSerdeV2<>(PageView.class)), "p1")
index aa9e107..184bb12 100644 (file)
@@ -207,18 +207,18 @@ public class StreamTaskIntegrationTest {
 
   static public class JoinTaskApplication implements TaskApplication {
     @Override
-    public void describe(TaskApplicationDescriptor appDesc) {
-      appDesc.setTaskFactory((StreamTaskFactory) () -> new StatefulStreamTask());
-      appDesc.addTable(new InMemoryTableDescriptor("profile-view-store",
+    public void describe(TaskApplicationDescriptor appDescriptor) {
+      appDescriptor.setTaskFactory((StreamTaskFactory) () -> new StatefulStreamTask());
+      appDescriptor.addTable(new InMemoryTableDescriptor("profile-view-store",
           KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde())));
       KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
       KafkaInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
-      appDesc.addInputStream(profileISD);
+      appDescriptor.addInputStream(profileISD);
       KafkaInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
-      appDesc.addInputStream(pageViewISD);
+      appDescriptor.addInputStream(pageViewISD);
       KafkaOutputDescriptor<EnrichedPageView> enrichedPageViewOSD =
           ksd.getOutputDescriptor("EnrichedPageView", new NoOpSerde<>());
-      appDesc.addOutputStream(enrichedPageViewOSD);
+      appDescriptor.addOutputStream(enrichedPageViewOSD);
     }
   }
 
index 20f18ee..b6d3ed8 100644 (file)
@@ -39,11 +39,11 @@ public class TestSchedulingApp implements StreamApplication {
   public static final String PAGE_VIEWS = "page-views";
 
   @Override
-  public void describe(StreamApplicationDescriptor appDesc) {
+  public void describe(StreamApplicationDescriptor appDescriptor) {
     final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
     KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka");
     KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(PAGE_VIEWS, serde);
-    final MessageStream<PageView> pageViews = appDesc.getInputStream(isd);
+    final MessageStream<PageView> pageViews = appDescriptor.getInputStream(isd);
     final MessageStream<PageView> output = pageViews.flatMap(new FlatmapScheduledFn());
 
     MessageStreamAssert.that("Output from scheduling function should container all complete messages", output, serde)
index dda31ea..24726f8 100644 (file)
@@ -55,10 +55,10 @@ public class RepartitionJoinWindowApp implements StreamApplication {
   private final List<String> intermediateStreamIds = new ArrayList<>();
 
   @Override
-  public void describe(StreamApplicationDescriptor appDesc) {
+  public void describe(StreamApplicationDescriptor appDescriptor) {
     // offset.default = oldest required for tests since checkpoint topic is empty on start and messages are published
     // before the application is run
-    Config config = appDesc.getConfig();
+    Config config = appDescriptor.getConfig();
     String inputTopic1 = config.get(INPUT_TOPIC_1_CONFIG_KEY);
     String inputTopic2 = config.get(INPUT_TOPIC_2_CONFIG_KEY);
     String outputTopic = config.get(OUTPUT_TOPIC_CONFIG_KEY);
@@ -66,8 +66,8 @@ public class RepartitionJoinWindowApp implements StreamApplication {
     KafkaInputDescriptor<PageView> id1 = ksd.getInputDescriptor(inputTopic1, new JsonSerdeV2<>(PageView.class));
     KafkaInputDescriptor<AdClick> id2 = ksd.getInputDescriptor(inputTopic2, new JsonSerdeV2<>(AdClick.class));
 
-    MessageStream<PageView> pageViews = appDesc.getInputStream(id1);
-    MessageStream<AdClick> adClicks = appDesc.getInputStream(id2);
+    MessageStream<PageView> pageViews = appDescriptor.getInputStream(id1);
+    MessageStream<AdClick> adClicks = appDescriptor.getInputStream(id2);
 
     MessageStream<KV<String, PageView>> pageViewsRepartitionedByViewId = pageViews
         .partitionBy(PageView::getViewId, pv -> pv,
index fdf0761..fe8e318 100644 (file)
@@ -47,19 +47,19 @@ public class RepartitionWindowApp implements StreamApplication {
 
 
   @Override
-  public void describe(StreamApplicationDescriptor appDesc) {
+  public void describe(StreamApplicationDescriptor appDescriptor) {
     KVSerde<String, PageView> inputSerde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageView.class));
     KVSerde<String, String> outputSerde = KVSerde.of(new StringSerde(), new StringSerde());
     KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
     KafkaInputDescriptor<KV<String, PageView>> id = ksd.getInputDescriptor(INPUT_TOPIC, inputSerde);
     KafkaOutputDescriptor<KV<String, String>> od = ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);
 
-    appDesc.getInputStream(id)
+    appDescriptor.getInputStream(id)
         .map(KV::getValue)
         .partitionBy(PageView::getUserId, m -> m, inputSerde, "p1")
         .window(Windows.keyedSessionWindow(m -> m.getKey(), Duration.ofSeconds(3), () -> 0, (m, c) -> c + 1, new StringSerde("UTF-8"), new IntegerSerde()), "w1")
         .map(wp -> KV.of(wp.getKey().getKey().toString(), String.valueOf(wp.getMessage())))
-        .sendTo(appDesc.getOutputStream(od));
+        .sendTo(appDescriptor.getOutputStream(od));
 
   }
 }
index 6dd4303..508e3dc 100644 (file)
@@ -57,15 +57,15 @@ public class SessionWindowApp implements StreamApplication {
   }
 
   @Override
-  public void describe(StreamApplicationDescriptor appDesc) {
+  public void describe(StreamApplicationDescriptor appDescriptor) {
     JsonSerdeV2<PageView> inputSerde = new JsonSerdeV2<>(PageView.class);
     KVSerde<String, Integer> outputSerde = KVSerde.of(new StringSerde(), new IntegerSerde());
     KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
     KafkaInputDescriptor<PageView> id = ksd.getInputDescriptor(INPUT_TOPIC, inputSerde);
     KafkaOutputDescriptor<KV<String, Integer>> od = ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);
 
-    MessageStream<PageView> pageViews = appDesc.getInputStream(id);
-    OutputStream<KV<String, Integer>> outputStream = appDesc.getOutputStream(od);
+    MessageStream<PageView> pageViews = appDescriptor.getInputStream(id);
+    OutputStream<KV<String, Integer>> outputStream = appDescriptor.getOutputStream(od);
 
     pageViews
         .filter(m -> !FILTER_KEY.equals(m.getUserId()))
index 4b87169..d1bd44f 100644 (file)
@@ -59,15 +59,15 @@ public class TumblingWindowApp implements StreamApplication {
   }
 
   @Override
-  public void describe(StreamApplicationDescriptor appDesc) {
+  public void describe(StreamApplicationDescriptor appDescriptor) {
     JsonSerdeV2<PageView> inputSerde = new JsonSerdeV2<>(PageView.class);
     KVSerde<String, Integer> outputSerde = KVSerde.of(new StringSerde(), new IntegerSerde());
     KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
     KafkaInputDescriptor<PageView> id = ksd.getInputDescriptor(INPUT_TOPIC, inputSerde);
     KafkaOutputDescriptor<KV<String, Integer>> od = ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);
 
-    MessageStream<PageView> pageViews = appDesc.getInputStream(id);
-    OutputStream<KV<String, Integer>> outputStream = appDesc.getOutputStream(od);
+    MessageStream<PageView> pageViews = appDescriptor.getInputStream(id);
+    OutputStream<KV<String, Integer>> outputStream = appDescriptor.getOutputStream(od);
 
     pageViews
         .filter(m -> !FILTER_KEY.equals(m.getUserId()))
index 0991fa1..a2170cc 100644 (file)
@@ -59,12 +59,12 @@ public class TestStreamApplication implements StreamApplication {
   }
 
   @Override
-  public void describe(StreamApplicationDescriptor streamAppDesc) {
+  public void describe(StreamApplicationDescriptor appDescriptor) {
     KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(systemName);
     KafkaInputDescriptor<String> isd = ksd.getInputDescriptor(inputTopic, new NoOpSerde<>());
     KafkaOutputDescriptor<String> osd = ksd.getOutputDescriptor(outputTopic, new StringSerde());
-    MessageStream<String> inputStream = streamAppDesc.getInputStream(isd);
-    OutputStream<String> outputStream = streamAppDesc.getOutputStream(osd);
+    MessageStream<String> inputStream = appDescriptor.getInputStream(isd);
+    OutputStream<String> outputStream = appDescriptor.getOutputStream(osd);
     inputStream.map(new TestMapFunction(appName, processorName)).sendTo(outputStream);
   }
 
index a9741b4..5a977e2 100644 (file)
@@ -416,12 +416,12 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
 
   static public class MyTaskApplication implements TaskApplication {
     @Override
-    public void describe(TaskApplicationDescriptor appDesc) {
-      appDesc.setTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
-      appDesc.addTable(new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new PageViewJsonSerde())));
+    public void describe(TaskApplicationDescriptor appDescriptor) {
+      appDescriptor.setTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
+      appDescriptor.addTable(new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new PageViewJsonSerde())));
       DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
       GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
-      appDesc.addInputStream(pageViewISD);
+      appDescriptor.addInputStream(pageViewISD);
     }
   }
 
index 5852de5..4e410d9 100644 (file)
@@ -130,14 +130,14 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
     static final String PROFILE_TABLE = "profile-table";
 
     @Override
-    public void describe(StreamApplicationDescriptor appDesc) {
-      Table<KV<Integer, TestTableData.Profile>> table = appDesc.getTable(getTableDescriptor());
+    public void describe(StreamApplicationDescriptor appDescriptor) {
+      Table<KV<Integer, TestTableData.Profile>> table = appDescriptor.getTable(getTableDescriptor());
       KafkaSystemDescriptor sd =
           new KafkaSystemDescriptor("test");
-      appDesc.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>()))
+      appDescriptor.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>()))
           .partitionBy(TestTableData.PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "partition-page-view")
           .join(table, new PageViewToProfileJoinFunction())
-          .sendTo(appDesc.getOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>())));
+          .sendTo(appDescriptor.getOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>())));
     }
 
     protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {