SAMZA-35: Write Javadocs for all samza-api interfaces
authorJakob Homan <jghoman@apache.org>
Thu, 19 Jun 2014 00:59:25 +0000 (17:59 -0700)
committerJakob Homan <jghoman@apache.org>
Thu, 19 Jun 2014 00:59:25 +0000 (17:59 -0700)
49 files changed:
samza-api/src/main/java/org/apache/samza/Partition.java
samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
samza-api/src/main/java/org/apache/samza/config/Config.java
samza-api/src/main/java/org/apache/samza/config/ConfigException.java
samza-api/src/main/java/org/apache/samza/config/ConfigFactory.java
samza-api/src/main/java/org/apache/samza/config/ConfigRewriter.java
samza-api/src/main/java/org/apache/samza/config/MapConfig.java
samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
samza-api/src/main/java/org/apache/samza/job/ApplicationStatus.java
samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
samza-api/src/main/java/org/apache/samza/job/StreamJob.java
samza-api/src/main/java/org/apache/samza/job/StreamJobFactory.java
samza-api/src/main/java/org/apache/samza/metrics/Counter.java
samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java
samza-api/src/main/java/org/apache/samza/metrics/MetricsReporter.java
samza-api/src/main/java/org/apache/samza/metrics/MetricsReporterFactory.java
samza-api/src/main/java/org/apache/samza/metrics/MetricsType.java [deleted file]
samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java
samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistry.java
samza-api/src/main/java/org/apache/samza/serializers/Deserializer.java
samza-api/src/main/java/org/apache/samza/serializers/Serde.java
samza-api/src/main/java/org/apache/samza/serializers/SerdeFactory.java
samza-api/src/main/java/org/apache/samza/serializers/Serializer.java
samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
samza-api/src/main/java/org/apache/samza/system/OutgoingMessageEnvelope.java
samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
samza-api/src/main/java/org/apache/samza/system/SystemFactory.java
samza-api/src/main/java/org/apache/samza/system/SystemProducer.java
samza-api/src/main/java/org/apache/samza/system/SystemStream.java
samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java
samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooser.java
samza-api/src/main/java/org/apache/samza/system/chooser/MessageChooserFactory.java
samza-api/src/main/java/org/apache/samza/task/ClosableTask.java
samza-api/src/main/java/org/apache/samza/task/StreamTask.java
samza-api/src/main/java/org/apache/samza/task/TaskContext.java
samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
samza-api/src/main/java/org/apache/samza/task/TaskLifecycleListenerFactory.java
samza-api/src/main/java/org/apache/samza/task/WindowableTask.java
samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
samza-api/src/main/java/org/apache/samza/util/Clock.java
samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java
samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala

index ebb77ed..66d517d 100644 (file)
@@ -20,7 +20,7 @@
 package org.apache.samza;
 
 /**
- * Used to represent a Samza stream partition.
+ * A numbered, ordered partition of a stream.
  */
 public class Partition {
   private final int partition;
index dcf81bf..6fad1fa 100644 (file)
@@ -25,7 +25,9 @@ import java.util.Map;
 import org.apache.samza.system.SystemStream;
 
 /**
- * Used to represent a checkpoint in the running of a Samza system.
+ * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
+ * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
+ * of restarting a failed container within a running job.
  */
 public class Checkpoint {
   private final Map<SystemStream, String> offsets;
index 34f50fd..a6e1ba6 100644 (file)
@@ -22,7 +22,8 @@ package org.apache.samza.checkpoint;
 import org.apache.samza.Partition;
 
 /**
- * Used as a standard interface for writing out checkpoints for a specified partition.
+ * CheckpointManagers read and write {@link org.apache.samza.checkpoint.Checkpoint} to some
+ * implementation-specific location.
  */
 public interface CheckpointManager {
   public void start();
index 5ce8f35..a97ff09 100644 (file)
@@ -22,6 +22,9 @@ package org.apache.samza.checkpoint;
 import org.apache.samza.config.Config;
 import org.apache.samza.metrics.MetricsRegistry;
 
+/**
+ * Build a {@link org.apache.samza.checkpoint.CheckpointManager}.
+ */
 public interface CheckpointManagerFactory {
   public CheckpointManager getCheckpointManager(Config config, MetricsRegistry registry);
 }
index c42c1c5..2048e90 100644 (file)
@@ -27,6 +27,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * Store and retrieve named, typed values as configuration for classes implementing this interface.
+ */
 public abstract class Config implements Map<String, String> {
   public Config subset(String prefix) {
     return subset(prefix, true);
index b6ab549..7619d11 100644 (file)
@@ -21,6 +21,9 @@ package org.apache.samza.config;
 
 import org.apache.samza.SamzaException;
 
+/**
+ * Specific {@link org.apache.samza.SamzaException}s thrown from {@link org.apache.samza.config.Config}
+ */
 public class ConfigException extends SamzaException {
   private static final long serialVersionUID = 1L;
 
index d6d7584..8230f0e 100644 (file)
@@ -21,6 +21,15 @@ package org.apache.samza.config;
 
 import java.net.URI;
 
+/**
+ * Build a {@link org.apache.samza.config.Config}
+ */
 public interface ConfigFactory {
+
+  /**
+   * Build a specific Config.
+   * @param configUri Resource containing information necessary for this Config.
+   * @return Newly constructed Config.
+   */
   Config getConfig(URI configUri);
 }
index 7248e8b..720126a 100644 (file)
@@ -20,7 +20,8 @@
 package org.apache.samza.config;
 
 /**
- * Re-write the job's config before the job is submitted.
+ * A ConfigRewriter receives the job's config during job startup and may re-write it to provide new configs,
+ * remove existing configs or audit and verify the config is correct or permitted.
  */
 public interface ConfigRewriter {
   Config rewrite(String name, Config config);
index 337e921..1a83923 100644 (file)
@@ -26,6 +26,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+/**
+ * A {@link org.apache.samza.config.Config} backed by a Java {@link java.util.Map}
+ */
 public class MapConfig extends Config {
   private final Map<String, String> map;
   
index 5aa7a8f..78d56a9 100644 (file)
@@ -24,6 +24,9 @@ import java.util.Collection;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 
+/**
+ * A SamzaContainerContext maintains per-container information for the tasks it executes.
+ */
 public class SamzaContainerContext {
   public final String name;
   public final Config config;
index 49052af..c41430a 100644 (file)
@@ -19,6 +19,9 @@
 
 package org.apache.samza.job;
 
+/**
+ * Status of a {@link org.apache.samza.job.StreamJob} during and after its run.
+ */
 public enum ApplicationStatus {
   Running("Running"), SuccessfulFinish("SuccessfulFinish"), UnsuccessfulFinish("UnsuccessfulFinish"), New("New");
 
index 5ec6433..cb40092 100644 (file)
@@ -25,6 +25,10 @@ import org.apache.samza.system.SystemStreamPartition;
 import java.util.Map;
 import java.util.Set;
 
+/**
+ * CommandBuilders are used to customize the command necessary to launch a Samza Job for a particular framework,
+ * such as YARN or the LocalJobRunner.
+ */
 public abstract class CommandBuilder {
   protected Set<SystemStreamPartition> systemStreamPartitions;
   protected String name;
index f519949..d8bc84f 100644 (file)
 
 package org.apache.samza.job;
 
+/**
+ * A StreamJob runs Samza {@link org.apache.samza.task.StreamTask}s in its specific environment.
+ * Users generally do not need to implement a StreamJob themselves, rather it is a framework-level
+ * interface meant for those extending Samza itself.  This class, and its accompanying factory,
+ * allow Samza to run on other service providers besides YARN and LocalJob, such as Mesos or Sun Grid Engine.
+ */
 public interface StreamJob {
+  /**
+   * Submit this job to be run.
+   * @return An instance of this job after it has been submitted.
+   */
   StreamJob submit();
 
+  /**
+   * Kill this job immediately.
+   *
+   * @return An instance of this job after it has been killed.
+   */
   StreamJob kill();
 
+  /**
+   * Block on this job until either it finishes or reaches its timeout value
+   *
+   * @param timeoutMs How many milliseconds to wait before returning, assuming the job has not yet finished
+   * @return {@link org.apache.samza.job.ApplicationStatus} of the job after finishing or timing out
+   */
   ApplicationStatus waitForFinish(long timeoutMs);
 
+  /**
+   * Block on this job until either it transitions to the specified status or reaches it timeout value
+   *
+   * @param status Target {@link org.apache.samza.job.ApplicationStatus} to wait upon
+   * @param timeoutMs How many milliseconds to wait before returning, assuming the job has not transitioned to the specified value
+   * @return {@link org.apache.samza.job.ApplicationStatus} of the job after finishing or reaching target state
+   */
   ApplicationStatus waitForStatus(ApplicationStatus status, long timeoutMs);
 
+  /**
+   * Get current {@link org.apache.samza.job.ApplicationStatus} of the job
+   * @return Current job status
+   */
   ApplicationStatus getStatus();
 }
index 4cdcc2c..2e8b159 100644 (file)
@@ -21,6 +21,9 @@ package org.apache.samza.job;
 
 import org.apache.samza.config.Config;
 
+/**
+ * Build a {@link org.apache.samza.job.StreamJob}
+ */
 public interface StreamJobFactory {
   StreamJob getJob(Config config);
 }
index 0838df3..6a85c66 100644 (file)
@@ -22,7 +22,8 @@ package org.apache.samza.metrics;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * A counter is a metric that represents a cumulative value.
+ * A counter is a {@link org.apache.samza.metrics.Metric} that represents a cumulative value.
+ * For example, the number of messages processed since the container was started.
  */
 public class Counter implements Metric {
   private final String name;
index 3335c15..c37bfbb 100644 (file)
@@ -21,6 +21,13 @@ package org.apache.samza.metrics;
 
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * A Gauge is a {@link org.apache.samza.metrics.Metric} that wraps some instance of T in a thread-safe
+ * reference and allows it to be set or retrieved.  Gauages record specific values over time.
+ * For example, the current length of a queue or the size of a buffer.
+ *
+ * @param <T> Instance to be wrapped in the gauge for metering.
+ */
 public class Gauge<T> implements Metric {
   private final String name;
   private AtomicReference<T> ref;
index 9df1ef6..1031e45 100644 (file)
 
 package org.apache.samza.metrics;
 
+/**
+ * A MetricsRegistry allows its users to create new {@link org.apache.samza.metrics.Metric}s and
+ * have those metrics wired to specific metrics systems, such as JMX, provided by {@link org.apache.samza.metrics.MetricsReporter}s.
+ * Those implementing Samza jobs use the MetricsRegistry to register metrics, which then handle
+ * the details of getting those metrics to each defined MetricsReporter.
+ *
+ * Users are free to define their metrics into groups as needed for their jobs. {@link org.apache.samza.metrics.MetricsReporter}s
+ * will likely use the group field to group the user-defined metrics together.
+ */
 public interface MetricsRegistry {
+  /**
+   * Create and register a new {@link org.apache.samza.metrics.Counter}
+   * @param group Group for this Counter
+   * @param name Name of to-be-created Counter
+   * @return New Counter instance
+   */
   Counter newCounter(String group, String name);
 
+  /**
+   * Register existing {@link org.apache.samza.metrics.Counter} with this registry
+   * @param group Group for this Counter
+   * @param counter Existing Counter to register
+   * @return Counter that was registered
+   */
   Counter newCounter(String group, Counter counter);
 
+  /**
+   * Create and register a new {@link org.apache.samza.metrics.Gauge}
+   * @param group Group for this Gauge
+   * @param name Name of to-be-created Gauge
+   * @param value Initial value for the Gauge
+   * @param <T> Type the Gauge will be wrapping
+   * @return Gauge was created and registered
+   */
   <T> Gauge<T> newGauge(String group, String name, T value);
 
+  /**
+   * Register an existing {@link org.apache.samza.metrics.Gauge}
+   * @param group Group for this Gauge
+   * @param value Initial value for the Gauge
+   * @param <T> Type the Gauge will be wrapping
+   * @return Gauge was registered
+   */
   <T> Gauge<T> newGauge(String group, Gauge<T> value);
 }
index d52dfa9..f28746c 100644 (file)
 
 package org.apache.samza.metrics;
 
+/**
+ * A MetricsReporter is the interface that different metrics sinks, such as JMX, implement to receive
+ * metrics from the Samza framework and Samza jobs.
+ */
 public interface MetricsReporter {
   void start();
 
index 19eb91c..7807222 100644 (file)
@@ -21,6 +21,9 @@ package org.apache.samza.metrics;
 
 import org.apache.samza.config.Config;
 
+/**
+ * Build a {@link org.apache.samza.metrics.MetricsReporter}
+ */
 public interface MetricsReporterFactory {
   MetricsReporter getMetricsReporter(String name, String containerName, Config config);
 }
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsType.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsType.java
deleted file mode 100644 (file)
index e79d4e6..0000000
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.metrics;
-
-public enum MetricsType {
-  MetricsCounter("MetricsCounter"), MetricsGauge("MetricsGauge");
-
-  private final String str;
-
-  private MetricsType(String str) {
-    this.str = str;
-  }
-
-  public String toString() {
-    return str;
-  }
-}
index fee0883..f4f756a 100644 (file)
@@ -20,7 +20,9 @@
 package org.apache.samza.metrics;
 
 /**
- * A metric visitor visits a metric, before metrics are flushed to a metrics stream.
+ * A MetricsVisitor can be used to process each metric in a {@link org.apache.samza.metrics.ReadableMetricsRegistry},
+ * encapsulating the logic of what to be done with each metric in the counter and gauge methods.  This makes it easy
+ * to quickly process all of the metrics in a registry.
  */
 public abstract class MetricsVisitor {
   public abstract void counter(Counter counter);
index ebea426..b495e2a 100644 (file)
@@ -22,6 +22,10 @@ package org.apache.samza.metrics;
 import java.util.Map;
 import java.util.Set;
 
+/**
+ * A ReadableMetricsRegistry is a {@link org.apache.samza.metrics.MetricsRegistry} that also
+ * allows read access to the metrics for which it is responsible.
+ */
 public interface ReadableMetricsRegistry extends MetricsRegistry {
   Set<String> getGroups();
 
index fe72223..a55b46a 100644 (file)
@@ -22,7 +22,9 @@ package org.apache.samza.serializers;
 /**
  * A standard interface for Samza compatible deserializers, used for deserializing serialized objects back to their
  * original form.
- * @param <T> The type of serialized object this deserializer should be implemented to deserialize.
+ *
+ * @param <T> The type of serialized object implementations can read
+
  */
 public interface Deserializer<T> {
   /**
index fab1055..a59b8c2 100644 (file)
 
 package org.apache.samza.serializers;
 
+/**
+ * A Serde is a convenience type that implements both the {@link org.apache.samza.serializers.Serializer} and
+ * {@link org.apache.samza.serializers.Deserializer} interfaces, allowing it to both read and write data
+ * in its value type, T.
+ *
+ * @param <T> The type of serialized object implementations can both read and write
+ */
 public interface Serde<T> extends Serializer<T>, Deserializer<T> {
 }
index a41a922..d09defb 100644 (file)
@@ -21,6 +21,10 @@ package org.apache.samza.serializers;
 
 import org.apache.samza.config.Config;
 
+/**
+ * Build an instance of {@link org.apache.samza.serializers.Serde}
+ * @param <T> The type of serialized object this factory's output can both read and write
+ */
 public interface SerdeFactory<T> {
   Serde<T> getSerde(String name, Config config);
 }
index 932e9a5..8388090 100644 (file)
@@ -21,7 +21,8 @@ package org.apache.samza.serializers;
 
 /**
  * A standard interface for Samza compatible serializers, used for serializing objects to bytes.
- * @param <T> The type of object this serializer should be implemented to serialize.
+ *
+ * @param <T> The type of serialized object implementations can write
  */
 public interface Serializer<T> {
   /**
index 96dec9b..747ee2b 100644 (file)
@@ -26,14 +26,19 @@ import org.apache.samza.system.IncomingMessageEnvelope;
 /**
  * A storage engine for managing state maintained by a stream processor.
  * 
- * This interface does not specify any query capabilities, which, of course,
+ * <p>This interface does not specify any query capabilities, which, of course,
  * would be query engine specific. Instead it just specifies the minimum
  * functionality required to reload a storage engine from its changelog as well
  * as basic lifecycle management.
  */
 public interface StorageEngine {
 
-  // TODO javadocs for StorageEngine.init
+  /**
+   * Restore the content of this StorageEngine from the changelog.  Messages are provided
+   * in one {@link java.util.Iterator} and not deserialized for efficiency, allowing the
+   * implementation to optimize replay, if possible.
+   * @param envelopes
+   */
   void restore(Iterator<IncomingMessageEnvelope> envelopes);
 
   /**
index da57bf0..963ccf2 100644 (file)
@@ -33,15 +33,16 @@ import org.apache.samza.task.MessageCollector;
  */
 public interface StorageEngineFactory<K, V> {
 
-  // TODO add Javadocs for MetricsRegistry and MessageCollector args
-
   /**
    * Create an instance of the given storage engine.
+   *
    * @param storeName The name of the storage engine.
    * @param storeDir The directory of the storage engine.
    * @param keySerde The serializer to use for serializing keys when reading or writing to the store.
    * @param msgSerde The serializer to use for serializing messages when reading or writing to the store.
-   * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog from.
+   * @param collector MessageCollector the storage engine uses to persist changes.
+   * @param registry MetricsRegistry to which to publish storage-engine specific metrics.
+   * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
    * @param containerContext Information about the container in which the task is executing.
    * @return The storage engine instance.
    */
index c8ef980..d32d9df 100644 (file)
@@ -20,8 +20,9 @@
 package org.apache.samza.system;
 
 /**
- * This class represents a message envelope that is sent by a StreamTask. It can be thought of as a complement to the
- * IncomingMessageEnvelope class.
+ * An OutgoingMessageEnvelope is sent to a specified {@link SystemStream} via the appropriate {@link org.apache.samza.system.SystemProducer}
+ * from the user's {@link org.apache.samza.task.StreamTask}.  StreamTasks consume from their input streams via their
+ * process method and write to their output streams by sending OutgoingMessageEnvelopes via the provided {@link org.apache.samza.task.MessageCollector}
  */
 public class OutgoingMessageEnvelope {
   private final SystemStream systemStream;
index 3976253..571c606 100644 (file)
@@ -23,7 +23,7 @@ import java.util.Map;
 import java.util.Set;
 
 /**
- * An interface that's use to interact with the underlying system to fetch
+ * Helper interface attached to an underlying system to fetch
  * information about streams, partitions, offsets, etc. This interface is useful
  * for providing utility methods that Samza needs in order to interact with a
  * system.
index a92e301..591f8fb 100644 (file)
@@ -137,7 +137,7 @@ public interface SystemConsumer {
    * Poll the SystemConsumer to get any available messages from the underlying
    * system.
    * 
-   * If the underlying implementation does not take care to adhere to the
+   * <p>If the underlying implementation does not take care to adhere to the
    * timeout parameter, the SamzaContainer's performance will suffer
    * drastically. Specifically, if poll blocks when it's not supposed to, it
    * will block the entire main thread in SamzaContainer, and no messages will
index ae33e8e..a1aae37 100644 (file)
@@ -22,6 +22,10 @@ package org.apache.samza.system;
 import org.apache.samza.config.Config;
 import org.apache.samza.metrics.MetricsRegistry;
 
+/**
+ * Build the {@link org.apache.samza.system.SystemConsumer} and {@link org.apache.samza.system.SystemProducer} for
+ * a particular system, as well as the accompanying {@link org.apache.samza.system.SystemAdmin}.
+ */
 public interface SystemFactory {
   SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry);
 
index 8967f57..9ce3032 100644 (file)
 package org.apache.samza.system;
 
 /**
- * Used as a standard interface for all producers of messages from a specified Samza source.
+ * SystemProducers are how Samza writes messages from {@link org.apache.samza.task.StreamTask}s to outside systems,
+ * such as messaging systems like Kafka, or file systems.  Implementations are responsible for accepting messages
+ * and writing them to their backing systems.
  */
 public interface SystemProducer {
+
+  /**
+   * Start the SystemProducer. After this method finishes it should be ready to accept messages received from the send method.
+   */
   void start();
 
+  /**
+   * Stop the SystemProducer. After this method finished, the system should have completed all necessary work, sent
+   * any remaining messages and will not receive any new calls to the send method.
+   */
   void stop();
 
   /**
@@ -40,5 +50,11 @@ public interface SystemProducer {
    */
   void send(String source, OutgoingMessageEnvelope envelope);
 
+  /**
+   * If the SystemProducer buffers messages before sending them to its underlying system, it should flush those
+   * messages and leave no messages remaining to be sent.
+   *
+   * @param source String representing the source of the message.
+   */
   void flush(String source);
 }
index 0265a2c..7f30e90 100644 (file)
 package org.apache.samza.system;
 
 /**
- * Used to represent a Samza stream.
+ * Streams in Samza consist of both the stream name and the system to which the stream belongs.
+ * Systems are defined through the job config and have corresponding serdes, producers and
+ * consumers in order to deserialize, send to and retrieve from them.  A stream name is dependent
+ * on its system, and may be the topic, queue name, file name, etc. as makes sense for a
+ * particular system.
  */
 public class SystemStream {
   protected final String system;
index 5173ebd..bb69c38 100644 (file)
@@ -22,7 +22,7 @@ package org.apache.samza.system;
 import org.apache.samza.Partition;
 
 /**
- * Aggregate object representing a partition of a Samza stream.
+ * Aggregate object representing a both the {@link org.apache.samza.system.SystemStream} and {@link org.apache.samza.Partition}.
  */
 public class SystemStreamPartition extends SystemStream {
   protected final Partition partition;
index 62a5eb7..9acfb10 100644 (file)
@@ -29,6 +29,10 @@ import java.util.Queue;
 
 import org.apache.samza.SamzaException;
 
+/**
+ * {@link java.util.Iterator} that wraps a {@link org.apache.samza.system.SystemConsumer} to iterate over
+ * the messages the consumer provides for the specified {@link org.apache.samza.system.SystemStreamPartition}.
+ */
 public class SystemStreamPartitionIterator implements Iterator<IncomingMessageEnvelope> {
   private final SystemConsumer systemConsumer;
   private final Map<SystemStreamPartition, Integer> fetchMap;
index 6d2fa23..a8fe2b8 100644 (file)
@@ -25,46 +25,46 @@ import org.apache.samza.system.SystemStreamPartition;
 /**
  * MessageChooser is an interface for programmatic fine-grain control over
  * stream consumption.
- * 
- * Consider the case of a Samza task is consuming multiple streams where some
+ *
+ * <p>Consider the case of a Samza task consuming multiple streams, where some
  * streams may be from live systems that have stricter SLA requirements and must
  * always be prioritized over other streams that may be from batch systems.
  * MessageChooser allows developers to inject message prioritization logic into
  * the SamzaContainer.
- * 
- * In general, the MessageChooser can be used to prioritize certain systems,
+ *
+ * <p>In general, the MessageChooser can be used to prioritize certain systems,
  * streams or partitions over others. It can also be used to throttle certain
- * partitions if it chooses not to return messages even though they are
- * available when choose is invoked. The MessageChooser can also throttle the
- * entire SamzaContainer by performing a blocking operation, such as
- * Thread.sleep.
- * 
- * The manner in which MessageChooser is used is:
- * 
+ * partitions, by choosing not to return messages even though they are
+ * available. The MessageChooser can also throttle the entire SamzaContainer by
+ * performing a blocking operation, such as Thread.sleep.
+ *
+ * <p>The manner in which MessageChooser is used is:
+ *
  * <ul>
- * <li>SystemConsumers buffers messages from all SystemStreamPartitions as they
+ * <li>SystemConsumers buffer messages from all SystemStreamPartitions as they
  * become available.</li>
  * <li>If MessageChooser has no messages for a given SystemStreamPartition, and
- * SystemConsumers has a message in its buffer for the SystemStreamPartition,
+ * a SystemConsumer has a message in its buffer for the SystemStreamPartition,
  * the MessageChooser will be updated once with the next message in the buffer.</li>
  * <li>When SamzaContainer is ready to process another message, it calls
- * SystemConsumers.choose, which in-turn calls MessageChooser.choose.</li>
+ * SystemConsumers.choose, which in turn calls {@link MessageChooser#choose}.</li>
  * </ul>
- * 
- * Since the MessageChooser only receives one message at a time per
- * SystemStreamPartition, it can be used to order messages between different
+ *
+ * <p>Since the MessageChooser only receives one message at a time per
+ * {@link SystemStreamPartition}, it can be used to order messages between different
  * SystemStreamPartitions, but it can't be used to re-order messages within a
  * single SystemStreamPartition (a buffered sort). This must be done within a
  * StreamTask.
- * 
- * The contract between the MessageChooser and the SystemConsumers is:
- * 
+ *
+ * <p>The contract between the MessageChooser and the SystemConsumers is:
+ *
  * <ul>
- * <li>Update can be called multiple times before choose is called.</li>
- * <li>A null return from MessageChooser.choose means no envelopes should be
+ * <li>{@link #update(IncomingMessageEnvelope)} can be called multiple times
+ * before {@link #choose()} is called.</li>
+ * <li>If {@link #choose()} returns null, that means no envelopes should be
  * processed at the moment.</li>
- * <li>A MessageChooser may elect to return null when choose is called, even if
- * unprocessed messages have been given by the update method.</li>
+ * <li>A MessageChooser may elect to return null when {@link #choose()} is
+ * called, even if unprocessed messages have been given by the update method.</li>
  * <li>A MessageChooser will not have any of its in-memory state restored in the
  * event of a failure.</li>
  * <li>Blocking operations (such as Thread.sleep) will block all processing in
@@ -104,7 +104,7 @@ public interface MessageChooser {
   void register(SystemStreamPartition systemStreamPartition, String offset);
 
   /**
-   * Notify the chooser that a new envelope is available for a processing.A
+   * Notify the chooser that a new envelope is available for a processing. A
    * MessageChooser will receive, at most, one outstanding envelope per
    * system/stream/partition combination. For example, if update is called for
    * partition 7 of kafka.mystream, then update will not be called with an
index 6442db9..1c3d3de 100644 (file)
@@ -22,6 +22,9 @@ package org.apache.samza.system.chooser;
 import org.apache.samza.config.Config;
 import org.apache.samza.metrics.MetricsRegistry;
 
+/**
+ * Build an instance of a {@link org.apache.samza.system.chooser.MessageChooser}
+ */
 public interface MessageChooserFactory {
   MessageChooser getChooser(Config config, MetricsRegistry registry);
 }
index a93cca0..36003b3 100644 (file)
 
 package org.apache.samza.task;
 
+/**
+ * A ClosableTask augments {@link org.apache.samza.task.StreamTask}, allowing the method implementer to specify
+ * code that will be called when the StreamTask is being shut down by the framework, providing to emit final metrics,
+ * clean or close resources, etc.  The close method is not guaranteed to be called in event of crash or hard kill
+ * of the process.
+ */
 public interface ClosableTask {
   void close() throws Exception;
 }
index 00d5efd..ed1d387 100644 (file)
@@ -22,8 +22,18 @@ package org.apache.samza.task;
 import org.apache.samza.system.IncomingMessageEnvelope;
 
 /**
- * Used as a standard interface for all user processing tasks. Receives messages from a partition of a specified input
- * stream.
+ * A StreamTask is the basic class on which Samza jobs are implemented.  Developers writing Samza jobs begin by
+ * implementing this class, which processes messages from the job's input streams and writes messages out to
+ * streams via the provided {@link org.apache.samza.task.MessageCollector}.  A StreamTask may be augmented by
+ * implementing other interfaces, such as {@link org.apache.samza.task.InitableTask}, {@link org.apache.samza.task.WindowableTask},
+ * or {@link org.apache.samza.task.ClosableTask}.
+ * <p>
+ * The methods of StreamTasks and associated other tasks are guaranteed to be called in a single-threaded fashion;
+ * no extra synchronization is necessary on the part of the class implementer.  References to instances of
+ * {@link org.apache.samza.system.IncomingMessageEnvelope}s,{@link org.apache.samza.task.MessageCollector}s, and
+ * {@link org.apache.samza.task.TaskCoordinator} should not be held onto between calls; there is no guarantee that
+ * these will not be invalidated or otherwise used by the framework.
+ *
  */
 public interface StreamTask {
   /**
index 611507e..7c1b085 100644 (file)
@@ -22,6 +22,10 @@ package org.apache.samza.task;
 import org.apache.samza.Partition;
 import org.apache.samza.metrics.MetricsRegistry;
 
+/**
+ * A TaskContext provides resources about the {@link org.apache.samza.task.StreamTask}, particularly during
+ * initialization in an {@link org.apache.samza.task.InitableTask} and during calls to {@link org.apache.samza.task.TaskLifecycleListener}s.
+ */
 public interface TaskContext {
   MetricsRegistry getMetricsRegistry();
 
index 5049b1b..6ff1a55 100644 (file)
 
 package org.apache.samza.task;
 
+/**
+ * TaskCoordinators are provided to the process methods of {@link org.apache.samza.task.StreamTask} implementations
+ * to allow the user code to request actions from the Samza framework, including committing the current checkpoints
+ * to configured {@link org.apache.samza.checkpoint.CheckpointManager}s or shutting down the task or all tasks within
+ * a container.
+ * <p>
+ *   This interface may evolve over time.
+ * </p>
+ */
 public interface TaskCoordinator {
   /**
    * Requests that Samza should write out a checkpoint, from which a task can restart
index 31f32bc..5ed7054 100644 (file)
@@ -21,6 +21,9 @@ package org.apache.samza.task;
 
 import org.apache.samza.config.Config;
 
+/**
+ * Build a {@link org.apache.samza.task.TaskLifecycleListener}
+ */
 public interface TaskLifecycleListenerFactory {
   TaskLifecycleListener getLifecyleListener(String name, Config config);
 }
index 1f48eec..62f4e39 100644 (file)
 package org.apache.samza.task;
 
 /**
- * Used as a standard interface to allow user processing tasks to operate on specified time intervals, or "windows".
+ * Add-on interface to {@link org.apache.samza.task.StreamTask} implementations to add code which will be run on
+ * a specified time interval (via configuration).  This can be used to implement direct time-based windowing or,
+ * with a frequent window interval, windowing based on some other condition which is checked during the call to
+ * window.  The window method will be called even if no messages are received for a particular StreamTask.
  */
 public interface WindowableTask {
   /**
@@ -28,6 +31,7 @@ public interface WindowableTask {
    * @param collector Contains the means of sending message envelopes to the output stream. The collector must only
    * be used during the current call to the window method; you should not reuse the collector between invocations
    * of this method.
+   *
    * @param coordinator Manages execution of tasks.
    * @throws Exception Any exception types encountered during the execution of the processing task.
    */
index 7171088..9503739 100644 (file)
@@ -39,7 +39,9 @@ import org.apache.samza.system.SystemStreamPartition;
  * BlockingEnvelopeMap is a helper class for SystemConsumer implementations.
  * Samza's poll() requirements make implementing SystemConsumers somewhat
  * tricky. BlockingEnvelopeMap is provided to help other developers write
- * SystemConsumers.
+ * SystemConsumers. The intended audience is not those writing Samza jobs,
+ * but rather those extending Samza to consume from new types of stream providers
+ * and other systems.
  * </p>
  * 
  * <p>
index e1a77e6..db92114 100644 (file)
@@ -19,6 +19,9 @@
 
 package org.apache.samza.util;
 
+/**
+ * Mockable interface for tracking time.
+ */
 public interface Clock {
   long currentTimeMillis();
 }
index 8bc0764..d7bc4a9 100644 (file)
@@ -23,6 +23,10 @@ import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.metrics.MetricsRegistry;
 
+/**
+ * {@link org.apache.samza.metrics.MetricsRegistry} implementation for when no actual metrics need to be
+ * recorded but a registry is still required.
+ */
 public class NoOpMetricsRegistry implements MetricsRegistry {
   @Override
   public Counter newCounter(String group, String name) {
index 91c1813..ad21a08 100644 (file)
@@ -202,11 +202,11 @@ class BootstrappingChooser(
    * offset 8 is chosen, not when the message with offset 10 is chosen.
    *
    * @param systemStreamPartition The SystemStreamPartition to check.
-   * @param offset The offset to check.
-   * @param newestOrUpcoming Whether to check the offset against the newest or
-   *                         upcoming offset for the SystemStreamPartition.
-   *                         Upcoming is useful during the registration phase,
-   *                         and newest is useful during the choosing phase.
+   * @param offset The offset of the most recently chosen message.
+   * @param offsetType Whether to check the offset against the newest or
+   *                   upcoming offset for the SystemStreamPartition.
+   *                   Upcoming is useful during the registration phase,
+   *                   and newest is useful during the choosing phase.
    */
   private def checkOffset(systemStreamPartition: SystemStreamPartition, offset: String, offsetType: OffsetType) {
     val systemStream = systemStreamPartition.getSystemStream
index 5fb7a20..c5fe462 100644 (file)
 
 package org.apache.samza.system.kafka
 
-import java.nio.ByteBuffer
-import java.util.Properties
 import scala.collection.mutable.ArrayBuffer
 import grizzled.slf4j.Logging
 import kafka.producer.KeyedMessage
 import kafka.producer.Producer
-import kafka.producer.ProducerConfig
-import org.apache.samza.config.Config
-import org.apache.samza.util.KafkaUtil
 import org.apache.samza.system.SystemProducer
 import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.samza.util.ExponentialSleepStrategy