SAMZA-1234: Documentation for 0.13.0 release
authorJacob Maes <jmaes@linkedin.com>
Thu, 1 Jun 2017 00:23:30 +0000 (17:23 -0700)
committerJacob Maes <jmaes@linkedin.com>
Thu, 1 Jun 2017 00:23:30 +0000 (17:23 -0700)
Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Yi Pan (Data Infrastructure) <nickpan47@gmail.com>, Boris Shkolnik <boryas@apache.org>

Closes #204 from jmakes/samza-1236-tutorial-1

15 files changed:
docs/_layouts/default.html
docs/img/versioned/learn/documentation/introduction/coordination-service.png [new file with mode: 0644]
docs/img/versioned/learn/documentation/introduction/execution-plan.png [new file with mode: 0644]
docs/img/versioned/learn/documentation/introduction/layered-arch.png [new file with mode: 0644]
docs/img/versioned/learn/tutorials/hello-samza-high-level/wikipedia-execution-plan.png [new file with mode: 0644]
docs/learn/documentation/versioned/rest/resources/jobs.md
docs/learn/documentation/versioned/yarn/yarn-host-affinity.md
docs/learn/tutorials/versioned/hello-samza-high-level-code.md [new file with mode: 0644]
docs/learn/tutorials/versioned/hello-samza-high-level-yarn.md [new file with mode: 0644]
docs/learn/tutorials/versioned/hello-samza-high-level-zk.md [new file with mode: 0644]
docs/learn/tutorials/versioned/index.md
docs/learn/tutorials/versioned/samza-async-user-guide.md
docs/startup/hello-samza/versioned/index.md
docs/startup/preview/index.md [new file with mode: 0644]
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java

index b9a05fa..e4a2d11 100644 (file)
@@ -65,6 +65,7 @@
             <ul>
               <li><a href="/startup/hello-samza/{{ navLink }}">Hello Samza</a></li>
               <li><a href="/startup/download">Download</a></li>
+              <li><a href="/startup/preview">Feature Preview</a></li>
             </ul>
 
             <h1><i class="fa fa-book"></i> Learn</h1>
diff --git a/docs/img/versioned/learn/documentation/introduction/coordination-service.png b/docs/img/versioned/learn/documentation/introduction/coordination-service.png
new file mode 100644 (file)
index 0000000..02f30d6
Binary files /dev/null and b/docs/img/versioned/learn/documentation/introduction/coordination-service.png differ
diff --git a/docs/img/versioned/learn/documentation/introduction/execution-plan.png b/docs/img/versioned/learn/documentation/introduction/execution-plan.png
new file mode 100644 (file)
index 0000000..11f5caf
Binary files /dev/null and b/docs/img/versioned/learn/documentation/introduction/execution-plan.png differ
diff --git a/docs/img/versioned/learn/documentation/introduction/layered-arch.png b/docs/img/versioned/learn/documentation/introduction/layered-arch.png
new file mode 100644 (file)
index 0000000..ea12ef5
Binary files /dev/null and b/docs/img/versioned/learn/documentation/introduction/layered-arch.png differ
diff --git a/docs/img/versioned/learn/tutorials/hello-samza-high-level/wikipedia-execution-plan.png b/docs/img/versioned/learn/tutorials/hello-samza-high-level/wikipedia-execution-plan.png
new file mode 100644 (file)
index 0000000..bb1e88c
Binary files /dev/null and b/docs/img/versioned/learn/tutorials/hello-samza-high-level/wikipedia-execution-plan.png differ
index 68ce9c2..8282a5d 100644 (file)
@@ -301,7 +301,7 @@ The [SimpleYarnJobProxy](../javadocs/org/apache/samza/rest/proxy/job/SimpleYarnJ
 
 The following is a depiction of the implementation that ships with Samza REST:
 
-![JobsResourceDiagram](/img/{{site.version}}/learn/documentation/rest/JobsResource.png)
+<img src="/img/{{site.version}}/learn/documentation/rest/JobsResource.png" alt="Jobs resource component diagram" style="max-width: 100%; height: auto;" onclick="window.open(this.src)"/>
 
 ## Configuration
 The JobsResource properties should be specified in the same file as the Samza REST configuration. They are specified here for clarity.
index 14e10cc..fea9522 100644 (file)
@@ -45,7 +45,7 @@ ls ${container_working_dir}/state/${store-name}/${task_name}/
 
 This allows the Node Manager's (NM) DeletionService to clean-up the working directory once the application completes or fails. In order to re-use local state store, the state store needs to be persisted outside the scope of NM's deletion service. The cluster administrator should set this location as an environment variable in Yarn - <code>LOGGED\_STORE\_BASE\_DIR</code>.
 
-![samza-host-affinity](/img/{{site.version}}/learn/documentation/yarn/samza-host-affinity.png)
+<img src="/img/{{site.version}}/learn/documentation/yarn/samza-host-affinity.png" alt="Yarn host affinity component diagram" style="max-width: 100%; height: auto;" onclick="window.open(this.src)"/>
 
 Each time a task commits, Samza writes the last materialized offset from the changelog stream to the checksumed file on disk. This is also done on container shutdown. Thus, there is an *OFFSET* file associated with each state stores' changelog partitions, that is consumed by the tasks in the container.
 
diff --git a/docs/learn/tutorials/versioned/hello-samza-high-level-code.md b/docs/learn/tutorials/versioned/hello-samza-high-level-code.md
new file mode 100644 (file)
index 0000000..6c0526e
--- /dev/null
@@ -0,0 +1,421 @@
+---
+layout: page
+title: Hello Samza High Level API - Code Walkthrough
+---
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+This tutorial introduces the high level API by showing you how to build wikipedia application from the [hello-samza high level API Yarn tutorial] (hello-samza-high-level-yarn.html). Upon completion of this tutorial, you'll know how to implement and configure a [StreamApplication](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html). Along the way, you'll see how to use some of the basic operators as well as how to leverage key-value stores and metrics in an app.
+
+The same [hello-samza](https://github.com/apache/samza-hello-samza) project is used for this tutorial as for many of the others. You will clone that project and by the end of the tutorial, you will have implemented a duplicate of the `WikipediaApplication`.
+
+Let's get started.
+
+### Get the Code
+
+Check out the hello-samza project:
+
+{% highlight bash %}
+git clone https://git.apache.org/samza-hello-samza.git hello-samza
+cd hello-samza
+git checkout latest
+{% endhighlight %}
+
+This project already contains implementations of the wikipedia application using both the low-level task API and the high-level API. The low-level task implementations are in the `samza.examples.wikipedia.task` package. The high-level application implementation is in the `samza.examples.wikipedia.application` package.
+
+This tutorial will provide step by step instructions to recreate the existing wikipedia application.
+
+### Introduction to Wikipedia Consumer
+In order to consume events from Wikipedia, the hello-samza project includes a `WikipediaSystemFactory` implementation of the Samza [SystemFactory](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/system/SystemFactory.html) that provides a `WikipediaConsumer`.
+
+The WikipediaConsumer is an implementation of [SystemConsumer](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/system/SystemConsumer.html) that can consume events from Wikipedia. It is also a listener for events from the `WikipediaFeed`. It's important to note that the events received in `onEvent` are of the type `WikipediaFeedEvent`, so we will expect that type for messages on our input streams. For other systems, the messages may come in the form of `byte[]`. In that case you may want to configure a samza [serde](/learn/documentation/{{site.version}}/container/serialization.html) and the application should expect the output type of that serde.
+
+Now that we understand the Wikipedia system and the types of inputs we'll be processing, we can proceed with creating our application.
+
+### Create the Initial Config
+In the hello-samza project, configs are kept in the _src/main/config/_ path. This is where we will add the config for our application.
+Create a new file named _my-wikipedia-application.properties_ in this location.
+
+#### Core Configuration
+Let's start by adding some of the core properties to the file:
+
+{% highlight bash %}
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+app.class=samza.examples.wikipedia.application.MyWikipediaApplication
+app.runner.class=org.apache.samza.runtime.RemoteApplicationRunner
+
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=my-wikipedia-application
+job.default.system=kafka
+
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+{% endhighlight %}
+
+Be sure to include the Apache header. The project will not compile without it. 
+
+Here's a brief summary of what we configured so far.
+
+* **app.class**: the class that defines the application logic. We will create this class later.
+* **app.runner.class**: the runner implementation which will launch our application. Since we are using YARN, we use `RemoteApplicationRunner` which is required for any cluster-based deployment.
+* **job.factory.class**: the [factory](/learn/documentation/{{site.version}}/jobs/job-runner.html) that will create the runtime instances of our jobs. Since we are using YARN, we want each job to be created as a [YARN job](/learn/documentation/{{site.version}}/jobs/yarn-jobs.html), so we use `YarnJobFactory`
+* **job.name**: the primary identifier for the job.
+* **job.default.system**: the default system to use for input, output, and internal metadata streams. This can be overridden on a per-stream basis. The _kafka_ system will be defined in the next section.
+* **yarn.package.path**: tells YARN where to find the [job package](/learn/documentation/{{site.version}}/jobs/packaging.html) so the Node Managers can download it.
+
+These basic configurations are enough to launch the application on YARN but we haven’t defined any streaming systems for Samza to use, so the application would not process anything.
+
+Next, let's define the streaming systems with which the application will interact. 
+
+#### Define Systems
+This Wikipedia application will consume events from Wikipedia and produce stats to a Kafka topic. We need to define those systems in config before Samza can use them. Add the following lines to the config:
+
+{% highlight bash %}
+systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
+systems.wikipedia.host=irc.wikimedia.org
+systems.wikipedia.port=6667
+
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.default.stream.replication.factor=1
+systems.kafka.default.stream.samza.msg.serde=json
+{% endhighlight %}
+
+The above configuration defines 2 systems; one called _wikipedia_ and one called _kafka_.
+
+A factory is required for each system, so the _systems.system-name.samza.system.factory_ property is required for both systems. The other properties are system and use-case specific.
+
+For the _kafka_ system, we set the default replication factor to 1 for all streams because this application is intended for a demo deployment which utilizes a Kafka cluster with only 1 broker, so a replication factor larger than 1 is invalid. The default serde is JSON, which means by default any streams consumed or produced to the _kafka_ system will use a _json_ serde, which we will define in the next section.
+
+The _wikipedia_ system does not need a serde because the `WikipediaConsumer` already produces a usable type.
+
+#### Serdes
+Next, we need to configure the [serdes](/learn/documentation/{{site.version}}/container/serialization.html) we will use for streams and stores in the application.
+{% highlight bash %}
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
+{% endhighlight %}
+
+The _json_ serde was used for the _kafka_ system above. The _string_ and _integer_ serdes will be used later.
+
+#### Configure Streams
+Samza identifies streams using a unique stream ID. In most cases, the stream ID is the same as the actual stream name. However, if a stream has a name that doesn't match the pattern `[A-Za-z0-9_-]+`, we need to configure a separate _physical.name_ to associate the actual stream name with a legal stream ID. The Wikipedia channels we will consume have a '#' character in the names. So for each of them we must pick a legal stream ID and then configure the physical name to match the channel.
+
+Samza uses the _job.default.system_ for any streams that do not explicitly specify a system. In the previous sections, we defined 2 systems, _wikipedia_ and _kafka_, and we configured _kafka_ as the default. To understand why, let's look at the streams and how Samza will use them.
+
+For this app, Samza will:
+
+1. Consume from input streams
+2. Produce to an output stream and a metrics stream
+3. Both produce and consume from job-coordination, checkpoint, and changelog streams
+
+While the _wikipedia_ system is necessary for case 1, it does not support producers (we can't write Samza output to Wikipedia), which are needed for cases 2-3. So it is more convenient to use _kafka_ as the default system. We can then explicitly configure the input streams to use the _wikipedia_ system.
+
+{% highlight bash %}
+streams.en-wikipedia.samza.system=wikipedia
+streams.en-wikipedia.samza.physical.name=#en.wikipedia
+
+streams.en-wiktionary.samza.system=wikipedia
+streams.en-wiktionary.samza.physical.name=#en.wiktionary
+
+streams.en-wikinews.samza.system=wikipedia
+streams.en-wikinews.samza.physical.name=#en.wikinews
+{% endhighlight %}
+
+The above configurations declare 3 streams with IDs, _en-wikipedia_, _en-wiktionary_, and _en-wikinews_. It associates each stream with the _wikipedia_ system we defined earlier and set the physical name to the corresponding Wikipedia channel. 
+
+Since all the Kafka streams for cases 2-3 are on the default system and do not include special characters in their names, we do not need to configure them explicitly.
+
+### Create a StreamApplication
+
+With the core configuration settled, we turn our attention to code.
+
+### Define Application Logic
+Let's create the application class we configured above. The next 8 sections walk you through writing the code for the Wikipedia application.
+
+Create a new class named `MyWikipediaApplication` in the `samza.examples.wikipedia.application` package. The class must implement [StreamApplication](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html) and should look like this:
+
+{% highlight java %}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package samza.examples.wikipedia.application;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+
+public class MyWikipediaApplication implements StreamApplication{
+  @Override
+  public void init(StreamGraph streamGraph, Config config) {
+    
+  }
+}
+{% endhighlight %}
+
+Be sure to include the Apache header. The project will not compile without it.
+
+The [init](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-) method is where the application logic is defined. The [Config](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/config/Config.html) argument is the runtime configuration loaded from the properties file we defined earlier. The [StreamGraph](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/StreamGraph.html) argument provides methods to declare input streams. You can then invoke a number of flexible operations on those streams. The result of each operation is another stream, so you can keep chaining more operations or direct the result to an output stream.
+
+Next, we will declare the input streams for the Wikipedia application.
+
+#### Inputs
+The Wikipedia application consumes events from three channels. Let's declare each of those channels as an input streams via the [StreamGraph](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/StreamGraph.html) in the [init](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-) method.
+{% highlight java %}
+MessageStream<WikipediaFeedEvent> wikipediaEvents = streamGraph.getInputStream("en-wikipedia", (k, v) -> (WikipediaFeedEvent) v);
+MessageStream<WikipediaFeedEvent> wiktionaryEvents = streamGraph.getInputStream("en-wiktionary", (k, v) -> (WikipediaFeedEvent) v);
+MessageStream<WikipediaFeedEvent> wikiNewsEvents = streamGraph.getInputStream("en-wikinews", (k, v) -> (WikipediaFeedEvent) v);
+{% endhighlight %}
+
+The first argument to the [getInputStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/StreamGraph.html#getInputStream-java.lang.String-java.util.function.BiFunction-) method is the stream ID. Each ID must match the corresponding stream IDs we configured earlier.
+
+The second argument is the *message builder*. It converts the input key and message to the appropriate type. In this case, we don't have a key and want to sent the events as-is, so we have a very simple builder that just forwards the input value.
+
+Note the streams are all MessageStreams of type WikipediaFeedEvent. [MessageStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html) is the in-memory representation of a stream in Samza. It uses generics to ensure type safety across the streams and operations. We knew the WikipediaFeedEvent type by inspecting the WikipediaConsumer above and we made it explicit with the cast on the output of the MessageBuilder. If our inputs used a serde, we would know the type based on which serde is configured for the input streams.
+
+#### Merge
+We'd like to use the same processing logic for all three input streams, so we will use the [mergeAll](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#mergeAll-java.util.Collection-) operator to merge them together. Note: this is not the same as a [join](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#join-org.apache.samza.operators.MessageStream-org.apache.samza.operators.functions.JoinFunction-java.time.Duration-) because we are not associating events by key. We are simply combining three streams into one, like a union.
+
+Add the following snippet to the [init](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-) method. It merges all the input streams into a new one called _allWikipediaEvents_
+{% highlight java %}
+MessageStream<WikipediaFeed.WikipediaFeedEvent> allWikipediaEvents = MessageStream.mergeAll(ImmutableList.of(wikipediaEvents, wiktionaryEvents, wikiNewsEvents));
+{% endhighlight %}
+
+Note there is a [merge](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#merge-java.util.Collection-) operator instance method on MessageStream, but the static [mergeAll](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#mergeAll-java.util.Collection-) method is a more convenient alternative if you need to merge many streams.
+
+#### Parse
+The next step is to parse the events and extract some information. We will use the pre-existing `WikipediaParser.parseEvent()' method to do this. The parser extracts some flags we want to monitor as well as some metadata about the event. Inspect the method signature. The input is a WikipediaFeedEvents and the output is a Map<String, Object>. These types will be reflected in the types of the streams before and after the operation.
+
+In the [init](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-) method, invoke the [map](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-) operation on `allWikipediaEvents`, passing the `WikipediaParser::parseEvent` method reference as follows:
+
+{% highlight java %}
+allWikipediaEvents.map(WikipediaParser::parseEvent);
+{% endhighlight %}
+
+#### Window
+Now that we have the relevant information extracted, let's perform some aggregations over a 10-second [window](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/windows/Window.html).
+
+First, we need a container class for statistics we want to track. Add the following static class after the [init](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-) method.
+{% highlight java %}
+private static class WikipediaStats {
+  int edits = 0;
+  int byteDiff = 0;
+  Set<String> titles = new HashSet<String>();
+  Map<String, Integer> counts = new HashMap<String, Integer>();
+}
+{% endhighlight %}
+
+Now we need to define the logic to aggregate the stats over the duration of the window. To do this, we implement [FoldLeftFunction](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/functions/FoldLeftFunction.html) by adding the following class after the `WikipediaStats` class:
+{% highlight java %}
+private class WikipediaStatsAggregator implements FoldLeftFunction<Map<String, Object>, WikipediaStats> {
+
+  @Override
+  public WikipediaStats apply(Map<String, Object> edit, WikipediaStats stats) {
+    // Update window stats
+    stats.edits++;
+    stats.byteDiff += (Integer) edit.get("diff-bytes");
+    stats.titles.add((String) edit.get("title"));
+
+    Map<String, Boolean> flags = (Map<String, Boolean>) edit.get("flags");
+    for (Map.Entry<String, Boolean> flag : flags.entrySet()) {
+      if (Boolean.TRUE.equals(flag.getValue())) {
+        stats.counts.compute(flag.getKey(), (k, v) -> v == null ? 0 : v + 1);
+      }
+    }
+
+    return stats;
+  }
+}
+{% endhighlight %}
+
+Note: the type parameters for [FoldLeftFunction](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/functions/FoldLeftFunction.html) reflect the upstream data type and the window value type, respectively. In our case, the upstream type is the output of the parser and the window value is our `WikipediaStats` class.
+
+Finally, we can define our [window](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/windows/Window.html) back in the [init](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-) method by chaining the result of the parser:
+{% highlight java %}
+allWikipediaEvents.map(WikipediaParser::parseEvent)
+        .window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new, new WikipediaStatsAggregator()));
+{% endhighlight %}
+
+This defines an unkeyed [tumbling window](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/windows/Windows.html) that spans 10s, which instantiates a new `WikipediaStats` object at the beginning of each window and aggregates the stats using `WikipediaStatsAggregator`.
+
+The output of the window is a [WindowPane](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/windows/WindowPane.html) with a key and value. Since we used an unkeyed tumbling window, the key is `Void`. The value is our `WikipediaStats` object.
+
+#### Output
+We want to use a JSON serializer to output the window values to Kafka, so we will do one more [map](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-) to format the output.
+
+First, let's define the method to format the stats as a `Map<String, String>` so the _json_ serde can handle it. Paste the following after the aggregator class:
+{% highlight java %}
+private Map<String, Integer> formatOutput(WindowPane<Void, WikipediaStats> statsWindowPane) {
+  WikipediaStats stats = statsWindowPane.getMessage();
+
+  Map<String, Integer> counts = new HashMap<String, Integer>(stats.counts);
+  counts.put("edits", stats.edits);
+  counts.put("bytes-added", stats.byteDiff);
+  counts.put("unique-titles", stats.titles.size());
+
+  return counts;
+}
+{% endhighlight %}
+
+Now, we can invoke the method by adding another [map](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-) operation to the chain in [init](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-). The operator chain should now look like this:
+{% highlight java %}
+allWikipediaEvents.map(WikipediaParser::parseEvent)
+        .window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new, new WikipediaStatsAggregator()))
+        .map(this::formatOutput);
+{% endhighlight %}
+
+Next we need to get the output stream to which we will send the stats. Insert the following line below the creation of the 3 input streams:
+{% highlight java %}
+OutputStream<Void, Map<String, Integer>, Map<String, Integer>>
+        wikipediaStats = streamGraph.getOutputStream("wikipedia-stats", m -> null, m -> m);
+{% endhighlight %}
+
+The [OutputStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/OutputStream.html) is parameterized by 3 types; the key type for the output, the value type for the output, and upstream type.
+
+The first parameter of [getOutputStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/StreamGraph.html#getOutputStream-java.lang.String-java.util.function.Function-java.util.function.Function-) is the output stream ID. We will use _wikipedia-stats_ and since it contains no special characters, we won't bother configuring a physical name so Samza will use the stream ID as the topic name.
+
+The second and third parameters are the *key extractor* and the *message extractor*, respectively. We have no key, so the *key extractor* simply produces null. The *message extractor* simply passes the message because it's already the correct type for the _json_ serde. Note: we could have skipped the previous [map](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-) operator and invoked our formatter here, but we kept them separate for pedagogical purposes.
+
+Finally, we can send our output to the output stream using the [sendTo](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#sendTo-org.apache.samza.operators.OutputStream-) operator:
+{% highlight java %}
+allWikipediaEvents.map(WikipediaParser::parseEvent)
+        .window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new, new WikipediaStatsAggregator()))
+        .map(this::formatOutput)
+        .sendTo(wikipediaStats);
+{% endhighlight %}
+
+Tip: Because the MessageStream type information is preserved in the operator chain, it is often easier to define the OutputStream inline with the [sendTo](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#sendTo-org.apache.samza.operators.OutputStream-) operator and then refactor it for readability. That way you don't have to hunt down the types.
+
+#### KVStore
+We now have an operational Wikipedia application which provides stats aggregated over a 10 second interval. One of those stats is a count of the number of edits within the 10s window. But what if we want to keep an additional durable counter of the total edits?
+
+We will do this by keeping a separate count outside the window and persisting it in a [KeyValueStore](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/storage/kv/KeyValueStore.html).
+
+We start by defining the store in the config file:
+{% highlight bash %}
+stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
+stores.wikipedia-stats.key.serde=string
+stores.wikipedia-stats.msg.serde=integer
+{% endhighlight %}
+
+These properties declare a [RocksDB](http://rocksdb.org/) key-value store named "wikipedia-stats". The store is replicated to a changelog stream called "wikipedia-stats-changelog" on the _kafka_ system for durability. It uses the _string_ and _integer_ serdes you defined earlier for keys and values respectively.
+
+Next, we add a total count member variable to the `WikipediaStats` class:
+{% highlight java %}
+int totalEdits = 0;
+{% endhighlight %}
+
+To use the store in the application, we need to get it from the [TaskContext](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/task/TaskContext.html). Also, since we want to emit the total edit count along with the window edit count, it's easiest to update both of them in our aggregator. Declare the store as a member variable of the `WikipediaStatsAggregator` class:
+{% highlight java %}
+private KeyValueStore<String, Integer> store;
+{% endhighlight %}
+
+Then override the [init](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/functions/InitableFunction.html#init-org.apache.samza.config.Config-org.apache.samza.task.TaskContext-) method in `WikipediaStatsAggregator` to initialize the store.
+{% highlight java %}
+@Override
+public void init(Config config, TaskContext context) {
+  store = (KeyValueStore<String, Integer>) context.getStore("wikipedia-stats");
+}
+{% endhighlight %}
+
+Update and persist the counter in the `apply` method.
+{% highlight java %}
+Integer editsAllTime = store.get("count-edits-all-time");
+if (editsAllTime == null) editsAllTime = 0;
+editsAllTime++;
+store.put("count-edits-all-time", editsAllTime);
+stats.totalEdits = editsAllTime;
+{% endhighlight %}
+
+Finally, update the `MyWikipediaApplication#formatOutput` method to include the total counter.
+{% highlight java %}
+counts.put("edits-all-time", stats.totalEdits);
+{% endhighlight %}
+
+#### Metrics
+Lastly, let's add a metric to the application which counts the number of repeat edits each topic within the window interval.
+
+As with the key-value store, we must first define the metrics reporters in the config file.
+{% highlight bash %}
+metrics.reporters=snapshot,jmx
+metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
+metrics.reporter.snapshot.stream=kafka.metrics
+metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
+{% endhighlight %}
+
+The above properties define 2 metrics reporters. The first emits metrics to a _metrics_ topic on the _kafka_ system. The second reporter emits metrics to JMX.
+
+In the WikipediaStatsAggregator, declare a counter member variable.
+{% highlight java %}
+private Counter repeatEdits;
+{% endhighlight %}
+
+Then add the following to the `WikipediaStatsAggregator#init` method to initialize the counter.
+{% highlight java %}
+repeatEdits = context.getMetricsRegistry().newCounter("edit-counters", "repeat-edits");
+{% endhighlight %}
+
+Update and persist the counter from the `apply` method.
+{% highlight java %}
+boolean newTitle = stats.titles.add((String) edit.get("title"));
+
+if (!newTitle) {
+  repeatEdits.inc();
+  log.info("Frequent edits for title: {}", edit.get("title"));
+}
+{% endhighlight %}
+
+#### Run and View Plan
+You can set up the grid and run the application using the same instructions from the [hello samza high level API Yarn tutorial] (hello-samza-high-level-yarn.html). The only difference is to replace the `wikipedia-application.properties` config file in the _config-path_ command line parameter with `my-wikipedia-application.properties`
+
+### Summary
+Congratulations! You have built and executed a Wikipedia stream application on Samza using the high level API. The final application should be directly comparable to the pre-existing `WikipediaApplication` in the project.
+
+You can provide feedback on this tutorial in the [dev mailing list](mailto:dev@samza.apache.org).
diff --git a/docs/learn/tutorials/versioned/hello-samza-high-level-yarn.md b/docs/learn/tutorials/versioned/hello-samza-high-level-yarn.md
new file mode 100644 (file)
index 0000000..1ad40df
--- /dev/null
@@ -0,0 +1,127 @@
+---
+layout: page
+title: Hello Samza High Level API - YARN Deployment
+---
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+The [hello-samza](https://github.com/apache/samza-hello-samza) project is an example project designed to help you run your first Samza application. It has examples of applications using the low level task API as well as the high level API.
+
+This tutorial demonstrates a simple wikipedia application created with the high level API. The [Hello Samza tutorial] (/startup/hello-samza/{{site.version}}/index.html) is the low-level analog to this tutorial. It demonstrates the same logic but is created with the task API. The tutorials are designed to be as similar as possible. The primary differences are that with the high level API we accomplish the equivalent of 3 separate low-level jobs with a single application, we skip the intermediate topics for simplicity, and we can visualize the execution plan after we start the application.
+
+### Get the Code
+
+Check out the hello-samza project:
+
+{% highlight bash %}
+git clone https://git.apache.org/samza-hello-samza.git hello-samza
+cd hello-samza
+git checkout latest
+{% endhighlight %}
+
+This project contains everything you'll need to run your first Samza application.
+
+### Start a Grid
+
+A Samza grid usually comprises three different systems: [YARN](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html), [Kafka](http://kafka.apache.org/), and [ZooKeeper](http://zookeeper.apache.org/). The hello-samza project comes with a script called "grid" to help you setup these systems. Start by running:
+
+{% highlight bash %}
+./bin/grid bootstrap
+{% endhighlight %}
+
+This command will download, install, and start ZooKeeper, Kafka, and YARN. It will also check out the latest version of Samza and build it. All package files will be put in a sub-directory called "deploy" inside hello-samza's root folder.
+
+If you get a complaint that JAVA_HOME is not set, then you'll need to set it to the path where Java is installed on your system.
+
+Once the grid command completes, you can verify that YARN is up and running by going to [http://localhost:8088](http://localhost:8088). This is the YARN UI.
+
+### Build a Samza Application Package
+
+Before you can run a Samza application, you need to build a package for it. This package is what YARN uses to deploy your apps on the grid.
+
+NOTE: if you are building from the latest branch of hello-samza project, make sure that you run the following step from your local Samza project first:
+
+{% highlight bash %}
+./gradlew publishToMavenLocal
+{% endhighlight %}
+
+Then, you can continue w/ the following command in hello-samza project:
+
+{% highlight bash %}
+mvn clean package
+mkdir -p deploy/samza
+tar -xvf ./target/hello-samza-0.13.0-SNAPSHOT-dist.tar.gz -C deploy/samza
+{% endhighlight %}
+
+### Run a Samza Application
+
+After you've built your Samza package, you can start the app on the grid using the run-app.sh script.
+
+{% highlight bash %}
+./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-application.properties
+{% endhighlight %}
+
+The app will do all of the following:
+
+1. Consume 3 feeds of real-time edits from Wikipedia
+3. Parse the events to extract information about the size of the edit, who made the change, etc.
+4. Calculate counts, every ten seconds, for all edits that were made during that window 
+5. Output the counts to the wikipedia-stats topic
+
+For details about how the app works, take a look at the [code walkthrough](hello-samza-high-level-code.html).
+
+Give the job a minute to startup, and then tail the Kafka topic:
+
+{% highlight bash %}
+./deploy/kafka/bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic wikipedia-stats
+{% endhighlight %}
+
+The messages in the stats topic look like this:
+
+{% highlight json %}
+{"is-talk":2,"bytes-added":5276,"edits":13,"unique-titles":13}
+{"is-bot-edit":1,"is-talk":3,"bytes-added":4211,"edits":30,"unique-titles":30,"is-unpatrolled":1,"is-new":2,"is-minor":7}
+{"bytes-added":3180,"edits":19,"unique-titles":19,"is-unpatrolled":1,"is-new":1,"is-minor":3}
+{"bytes-added":2218,"edits":18,"unique-titles":18,"is-unpatrolled":2,"is-new":2,"is-minor":3}
+{% endhighlight %}
+
+Pretty neat, right? Now, check out the YARN UI again ([http://localhost:8088](http://localhost:8088)). This time around, you'll see your Samza job is running!
+
+### View the Execution Plan
+Each application goes through an execution planner and you can visualize the execution plan after starting the job by opening the following file in a browser
+{% highlight bash %}
+deploy/samza/bin/plan.html
+{% endhighlight %}
+
+This plan will make more sense after the [code walkthrough](hello-samza-high-level-code.html). For now, just take note that this visualization is available and it is useful for visibility into the structure of the application. For this tutorial, the plan should look something like this:
+
+<img src="/img/{{site.version}}/learn/tutorials/hello-samza-high-level/wikipedia-execution-plan.png" alt="Execution plan" style="max-width: 100%; height: auto;" onclick="window.open(this.src)"/>
+
+
+### Shutdown
+
+To shutdown the app, use the same _run-app.sh_ script with an extra _--operation=kill_ argument
+{% highlight bash %}
+./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-application.properties --operation=kill
+{% endhighlight %}
+
+After you're done, you can clean everything up using the same grid script.
+
+{% highlight bash %}
+./bin/grid stop all
+{% endhighlight %}
+
+Congratulations! You've now setup a local grid that includes YARN, Kafka, and ZooKeeper, and run a Samza application on it. Curious how this application was built? See the [code walk-through](hello-samza-high-level-code.html).
diff --git a/docs/learn/tutorials/versioned/hello-samza-high-level-zk.md b/docs/learn/tutorials/versioned/hello-samza-high-level-zk.md
new file mode 100644 (file)
index 0000000..9fd947b
--- /dev/null
@@ -0,0 +1,111 @@
+---
+layout: page
+title: Hello Samza High Level API - Zookeeper Deployment
+---
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+The [hello-samza](https://github.com/apache/samza-hello-samza) project is designed to get started with your first Samza job.
+In this tutorial, we will learn how to run a Samza application using ZooKeeper deployment model.
+
+### Get the Code
+
+Let's get started by cloning the hello-samza project
+
+{% highlight bash %}
+git clone https://git.apache.org/samza-hello-samza.git hello-samza
+cd hello-samza
+git checkout latest
+{% endhighlight %}
+
+The project comes up with numerous examples and for this tutorial, we will pick the Wikipedia application.
+
+### Setting up the Deployment Environment
+
+For our Wikipedia application, we require two systems: [Kafka](http://kafka.apache.org/) and [ZooKeeper](http://zookeeper.apache.org/). The hello-samza project comes with a script called "grid" to help with the environment setup
+
+{% highlight bash %}
+./bin/grid standalone
+{% endhighlight %}
+
+This command will download, install, and start ZooKeeper and Kafka. It will also check out the latest version of Samza and build it. All package files will be put in a sub-directory called "deploy" inside hello-samza's root folder.
+
+If you get a complaint that JAVA_HOME is not set, then you'll need to set it to the path where Java is installed on your system.
+
+### Building the Hello Samza Project
+
+NOTE: if you are building from the latest branch of hello-samza project and want to use your local copy of samza, make sure that you run the following step from your local Samza project first
+
+{% highlight bash %}
+./gradlew publishToMavenLocal
+{% endhighlight %}
+
+With the environment setup complete, let us move on to building the hello-samza project. Execute the following commands:
+
+{% highlight bash %}
+mvn clean package
+mkdir -p deploy/samza
+tar -xvf ./target/hello-samza-0.13.0-SNAPSHOT-dist.tar.gz -C deploy/samza
+{% endhighlight %}
+
+We are now all set to deploy the application locally.
+
+### Running the Wikipedia application
+
+In order to run the application, we will use the *run-wikipedia-zk-application* script.
+
+{% highlight bash %}
+./deploy/samza/bin/run-wikipedia-zk-application.sh
+{% endhighlight %}
+
+The above command executes the helper script which invokes the *WikipediaZkLocalApplication* main class with the appropriate job configurations as command line arguments. The main class is an application wrapper
+that initializes the application and passes it to the local runner for execution. It is blocking and waits for the *LocalApplicationRunner* to finish.
+
+To run your own application using ZooKeeper deployment model, you would need something similar to *WikipediaZkLocalApplication* class that initializes your application
+and uses the *LocalApplicationRunner* to run it. To learn more about the internals checkout [deployment-models](/startup/preview/) documentation and the [configurations](/learn/documentation/{{site.version}}/jobs/configuration-table.html) table.
+
+Getting back to our example, the application consumes a feed of real-time edits from Wikipedia, and produces them to a Kafka topic called "wikipedia-stats". Give the job a minute to startup, and then tail the Kafka topic. To do so, run the following command:
+
+{% highlight bash %}
+./deploy/kafka/bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic wikipedia-stats
+{% endhighlight %}
+
+The messages in the stats topic should look like the sample below:
+
+{% highlight json %}
+{"is-talk":2,"bytes-added":5276,"edits":13,"unique-titles":13}
+{"is-bot-edit":1,"is-talk":3,"bytes-added":4211,"edits":30,"unique-titles":30,"is-unpatrolled":1,"is-new":2,"is-minor":7}
+{"bytes-added":3180,"edits":19,"unique-titles":19,"is-unpatrolled":1,"is-new":1,"is-minor":3}
+{"bytes-added":2218,"edits":18,"unique-titles":18,"is-unpatrolled":2,"is-new":2,"is-minor":3}
+{% endhighlight %}
+
+Excellent! Now that the job is running, open the *plan.html* file under *deploy/samza/bin* directory to take a look at the execution plan for the Wikipedia application.
+The execution plan is a colorful graphic representing various stages of your application and how they are connected. Here is a sample plan visualization:
+
+<img src="/img/{{site.version}}/learn/tutorials/hello-samza-high-level/wikipedia-execution-plan.png" alt="Execution plan" style="max-width: 100%; height: auto;" onclick="window.open(this.src)"/>
+
+
+### Shutdown
+
+The Wikipedia application can be shutdown by terminating the *run-wikipedia-zk-application* script.
+We can use the *grid* script to tear down the local environment ([Kafka](http://kafka.apache.org/) and [Zookeeper](http://zookeeper.apache.org/)).
+
+{% highlight bash %}
+bin/grid stop all
+{% endhighlight %}
+
+Congratulations! You've now successfully run a Samza application using ZooKeeper deployment model. Next up, check out the [deployment-models](/startup/preview/) and [high level API](/startup/preview.html) pages.
index 6d6295f..a9ac6a7 100644 (file)
@@ -18,6 +18,15 @@ title: Tutorials
    See the License for the specific language governing permissions and
    limitations under the License.
 -->
+<!-- Uncomment after these features are fully released
+[[Preview] Hello Samza High Level API Zookeeper Deployment](hello-samza-high-level-zk.html)
+
+[[Preview] Hello Samza High Level API Yarn Deployment](hello-samza-high-level-yarn.html)
+
+[[Preview] Hello Samza High Level API Code](hello-samza-high-level-code.html)
+-->
+
+[Hello Samza Low Level API Yarn Deployment](/startup/hello-samza/{{site.version}}/)
 
 [Remote Debugging with Samza](remote-debugging-samza.html)
 
index 30865a8..3e3314c 100644 (file)
@@ -60,7 +60,7 @@ job.container.thread.pool.size=16
 
 ### Asynchronous Process with AsyncStreamTask API
 
-If your job process is asynchronous, e.g. making non-blocking remote IO calls, [AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask.html) interface provides the support for it. In the following example AsyncRestTask makes asynchronous rest call and triggers callback once it's complete. 
+If your job process is asynchronous, e.g. making non-blocking remote IO calls, [AsyncStreamTask](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/task/AsyncStreamTask.html) interface provides the support for it. In the following example AsyncRestTask makes asynchronous rest call and triggers callback once it's complete.
 
 {% highlight java %}
 public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask {
@@ -98,7 +98,7 @@ public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTas
 }
 {% endhighlight %}
 
-In the above example, the process is not complete when processAsync() returns. In the callback thread from Jersey client, we trigger [TaskCallback](javadocs/org/apache/samza/task/TaskCallback.html) to indicate the process is done. In order to make sure the callback will be triggered within certain time interval, e.g. 5 seconds, you can config the following property:
+In the above example, the process is not complete when processAsync() returns. In the callback thread from Jersey client, we trigger [TaskCallback](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/task/TaskCallback.html) to indicate the process is done. In order to make sure the callback will be triggered within certain time interval, e.g. 5 seconds, you can config the following property:
 
 {% highlight jproperties %}
 # Timeout for processAsync() callback. When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container.
index 89b7ab9..537516b 100644 (file)
@@ -18,7 +18,7 @@ title: Hello Samza
    See the License for the specific language governing permissions and
    limitations under the License.
 -->
-The [hello-samza](https://github.com/apache/samza-hello-samza) project is a stand-alone project designed to help you run your first Samza job.
+The [hello-samza](https://github.com/apache/samza-hello-samza) project is an example project designed to help you run your first Samza job.
 
 ### Get the Code
 
@@ -66,10 +66,10 @@ tar -xvf ./target/hello-samza-0.13.0-SNAPSHOT-dist.tar.gz -C deploy/samza
 
 ### Run a Samza Job
 
-After you've built your Samza package, you can start a job on the grid using the run-job.sh script.
+After you've built your Samza package, you can start a job on the grid using the run-app.sh script.
 
 {% highlight bash %}
-deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-feed.properties
+deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-feed.properties
 {% endhighlight %}
 
 The job will consume a feed of real-time edits from Wikipedia, and produce them to a Kafka topic called "wikipedia-raw". Give the job a minute to startup, and then tail the Kafka topic:
@@ -87,8 +87,8 @@ If you can not see any output from Kafka consumer, you may have connection probl
 Let's calculate some statistics based on the messages in the wikipedia-raw topic. Start two more jobs:
 
 {% highlight bash %}
-deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-parser.properties
-deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-stats.properties
+deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-parser.properties
+deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-stats.properties
 {% endhighlight %}
 
 The first job (wikipedia-parser) parses the messages in wikipedia-raw, and extracts information about the size of the edit, who made the change, etc. You can take a look at its output with:
@@ -116,6 +116,11 @@ If you check the YARN UI, again, you'll see that all three jobs are now listed.
 
 ### Shutdown
 
+To shutdown one of the jobs, use the same script with an extra '--operation=kill' argument
+{% highlight bash %}
+deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-feed.properties --operation=kill
+{% endhighlight %}
+
 After you're done, you can clean everything up using the same grid script.
 
 {% highlight bash %}
diff --git a/docs/startup/preview/index.md b/docs/startup/preview/index.md
new file mode 100644 (file)
index 0000000..963b3fb
--- /dev/null
@@ -0,0 +1,530 @@
+---
+layout: page
+title: Feature Preview
+---
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+1. [Overview](#overview)
+2. [Try It Out](#try-it-out)
+3. [Architecture](#architecture)
+4. [High Level API](#high-level-api)
+5. [Flexible Deployment Model](#flexible-deployment-model)
+
+---
+
+# Overview
+Samza 0.13.0 introduces a new programming model and a new deployment model. They're being released as a preview because they represent major enhancements to how developers work with Samza, so it is beneficial for both early adopters and the Samza development community to experiment with the release and provide feedback. The following sections introduce the new features and link to tutorials which demonstrate how to use them. Please try them and send feedback to the [dev mailing list](mailto:dev@samza.apache.org)
+
+---
+
+### Try it Out
+Want to skip all the details and get some hands on experience? There are three tutorials to help you get acquainted with running Samza applications in both YARN and embedded modes and programming with the high level API:
+
+* [Yarn Deployment](/learn/tutorials/{{site.version}}/hello-samza-high-level-yarn.html) - run a pre-existing Wikipedia application on YARN and observe the output.
+* [High Level API Code Walkthrough](/learn/tutorials/{{site.version}}/hello-samza-high-level-code.html) - walk through building the Wikipedia application, step by step.
+* [ZooKeeper Deployment](/learn/tutorials/{{site.version}}/hello-samza-high-level-zk.html) - run a pre-existing Wikipedia application with ZooKeeper coordination and observe the output.
+
+---
+
+## Architecture
+
+### Introduction
+The Samza high level API provides a unified way to handle both streaming and batch data. You can describe the end-to-end application logic in a single program with operators like map, filter, window, and join to accomplish what previously required multiple jobs. The API is designed to be portable. The same application code can be deployed in batch or streaming modes, embedded or with a cluster manager environments, and can switch between Kafka, Kinesis, HDFS or other systems with a simple configuration change. This portability is enabled by a new architecture which is described in the sections below.
+
+### Concepts
+The Samza architecture has been overhauled with distinct layers to handle each stage of application development. The following diagram shows an overview of Apache Samza architecture with the high level API.
+
+<img src="/img/{{site.version}}/learn/documentation/introduction/layered-arch.png" alt="Architecture diagram" style="max-width: 100%; height: auto;" onclick="window.open(this.src)">
+
+There are four layers in the architecture. The following sections describe each of the layers.
+
+#### I. High Level API
+
+The high level API provides the libraries to define your application logic. The [StreamApplication](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html) is the central abstraction which your application must implement. You start by declaring your inputs as instances of [MessageStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html). Then you can apply operators on each MessageStream like map, filter, window, and join to define the whole end-to-end data processing in a single program.
+
+For a deeper dive into the high level API, see [high level API section](#high-level-api) below.
+
+#### II. ApplicationRunner
+
+Samza uses an [ApplicationRunner]((/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/runtime/ApplicationRunner.html)) to run a stream application. The ApplicationRunner generates the configs (such as input/output streams), creates intermediate streams, and starts the execution. There are two types of ApplicationRunner:
+
+**RemoteApplicationRunner** - submits the application to a remote cluster. This runner is invoked via the _run-app.sh_ script. To use RemoteApplicationRunner, set the following configurations
+
+{% highlight jproperties %}
+# The StreamApplication class to run
+app.class=com.company.job.YourStreamApplication
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+{% endhighlight %}
+
+Then use _run-app.sh_ to run the application in the remote cluster. The script will invoke the RemoteApplicationRunner, which will launch one or more jobs using the factory specified with *job.factory.class*. Follow the [yarn deployment tutorial](/learn/tutorials/{{site.version}}/hello-samza-high-level-yarn.html) to try it out.
+
+**LocalApplicationRunner** - runs the application in the JVM process of the runner. For example, to launch your application on multiple machines using ZooKeeper for coordination, you can run multiple instances of LocalApplicationRunner on various machines. After the applications load they will start cordinatinating their actions through ZooKeeper. Here is an example to run the StreamApplication in your program using the LocalApplicationRunner:
+
+{% highlight java %}
+public static void main(String[] args) throws Exception {
+CommandLine cmdLine = new CommandLine();
+Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
+StreamApplication app = new YourStreamApplication();
+localRunner.run(app);
+
+// Wait for the application to finish
+localRunner.waitForFinish();
+System.out.println("Application completed with status " + localRunner.status(app));
+}
+{% endhighlight %}
+
+Follow the [ZooKeeper deployment tutorial](/learn/tutorials/{{site.version}}/hello-samza-high-level-zk.html) to try it out.
+
+##### Execution Plan
+
+The ApplicationRunner generates a physical execution plan for your processing logic before it starts executing it. The plan represents the runtime structure of the application. Particularly, it provides visibility into the generated intermediate streams. Once the job is deployed, the plan can be viewed as follows:
+
+* For applications launched using _run-app.sh_, Samza will create a _plan_ directory under your application deployment directory and write the _plan.json_ file there.
+* For the applications launched using your own script (e.g. for LocalApplicationRunner), please create a _plan_ directory at the same level as _bin_, and point the `EXECUTION_PLAN_DIR` environment variable to its location.
+
+To view the plan, open the _bin/plan.html_ file in a browser. Here's a sample plan visualization:
+
+<img src="/img/{{site.version}}/learn/documentation/introduction/execution-plan.png" alt="Execution plan" style="max-width: 100%; height: auto;" onclick="window.open(this.src)"/>
+
+#### III. Execution Models
+
+Samza supports two types of execution models: cluster based execution and embedded execution.
+
+In cluster based execution, Samza will run and manage your application on a multi-tenant cluster. Samza ships with support for [YARN](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html). You can implement your own StreamJob and a corresponding ResourceManagerFactory to add support for another cluster manager.
+
+In the embedded execution model, you can use Samza as a lightweight library within your application. You can spin up multiple instances of your application which will distribute and coordinate processing among themselves. This mode provides flexibility for running your applications in arbitrary hosting environments:It also supports pluggable coordination logic with out-of-the-box support for two types of coordination:
+
+* **ZooKeeper based coordination** - Samza can be configured to use ZooKeeper to manage group membership and partition assignment among instances of your application. This allows the you to dynamically scale your application by spinning up more instances or scaling down by shutting some down.
+* **External coordination** - Samza can run your application in a single JVM locally without coordination, or multiple JVMs with a static partition assignment. This is helpful when running in containerized environments like Kubernetes or Amazon ECS.
+
+For more details on running Samza in embedded mode, take a look at the [flexible deployment model](#flexible-deployment-model) section below.
+
+#### IV. Processor
+
+The lowest execution unit of a Samza application is the processor. It reads the configs generated from the [ApplicationRunner](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/runtime/ApplicationRunner.html) and processes the input stream partitions assigned by the JobCoordinator. It can access local state using a [KeyValueStore]((/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/storage/KeyValueStore.html)) implementation (e.g. RocksDB or in-memory) and remote state (e.g. REST service) using multithreading.
+
+---
+
+## High Level API
+Since the 0.13.0 release, Samza provides a new high level API that simplifies your applications. This API supports operations like re-partitioning, windowing, and joining on streams. You can now express your application logic concisely in few lines of code and accomplish what previously required multiple jobs.
+
+## Code Examples
+
+Check out some examples to see the high-level API in action.
+
+1.  [Pageview AdClick Joiner](https://github.com/apache/samza-hello-samza/blob/c87ed565fbaebf2ac88376143c65e9f52f7a8801/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java) demonstrates joining a stream of PageViews with a stream of AdClicks, e.g. to analyze which pages get the most ad clicks.
+2.  [Pageview Repartitioner](https://github.com/apache/samza-hello-samza/blob/c87ed565fbaebf2ac88376143c65e9f52f7a8801/src/main/java/samza/examples/cookbook/PageViewFilterApp.java) illustrates re-partitioning the incoming stream of PageViews.
+3.  [Pageview Sessionizer](https://github.com/apache/samza-hello-samza/blob/c87ed565fbaebf2ac88376143c65e9f52f7a8801/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java) groups the incoming stream of events into sessions based on user activity.
+4.  [Pageview by Region](https://github.com/apache/samza-hello-samza/blob/c87ed565fbaebf2ac88376143c65e9f52f7a8801/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java) counts the number of views per-region over tumbling time intervals.
+
+
+## Key Concepts
+
+### StreamApplication
+When writing your stream processing application using the Samza high-level API, you should implement a [StreamApplication](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html) and define your processing logic in the init method.
+{% highlight java %}
+public void init(StreamGraph graph, Config config) { … }
+{% endhighlight %}
+
+For example, here is a StreamApplication that validates and decorates page views with viewer's profile information.
+
+{% highlight java %}
+public class BadPageViewFilter implements StreamApplication {
+  @Override
+public void init(StreamGraph graph, Config config) {
+    MessageStream<PageView> pageViews = graph.getInputStream(“page-views”..);
+
+    pageViews.filter(this::isValidPageView)
+                      .map(this::addProfileInformation)
+                      .sendTo(graph.getOutputStream(“decorated-page-views”..))
+ }
+}
+{% endhighlight %}
+
+### MessageStream
+A MessageStream, as the name implies, represents a stream of messages. A StreamApplication is described as a series of transformations on MessageStreams. You can get a MessageStream in two ways:
+
+1. Using StreamGraph.getInputStream to get the MessageStream for a given input stream (e.g., a Kafka topic).
+
+2. By transforming an existing MessageStream using operations like map, filter, window, join etc.
+
+## Anatomy of a typical Samza StreamApplication
+There are 3 simple steps to write your stream processing applications using the Samza high-level API.
+
+### Step 1: Obtain the input streams:
+You can obtain the MessageStream for your input stream ID (“page-views”) using StreamGraph.getInputStream.
+    {% highlight java %}
+    MessageStream<PageView> pageViewInput = graph.getInputStream(“page-views”, (k,v) -> v);
+    {% endhighlight %}
+
+The first parameter `page-views` is the logical stream ID. Each stream ID is associated with a *physical name* and a *system*. By default, Samza uses the stream ID as the physical stream name and accesses the stream on the default system which is specified with the property “job.default.system”. However, the *physical name* and *system* properties can be overridden in configuration. For example, the following configuration defines the stream ID "page-views" as an alias for the PageViewEvent topic in a local Kafka cluster.
+
+{% highlight jproperties %}
+streams.page-views.samza.system=kafka
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.producer.bootstrap.servers=localhost:9092
+streams.page-views.samza.physical.name=PageViewEvent
+{% endhighlight %}
+
+The second parameter `(k,v) -> v` is the MessageBuilder function that is used to construct a message from the incoming key and value.
+
+### Step 2: Define your transformation logic:
+You are now ready to define your StreamApplication logic as a series of transformations on MessageStreams.
+
+{% highlight java %}
+MessageStream<DecoratedPageViews> decoratedPageViews
+                                   = pageViewInput.filter(this::isValidPageView)
+                                                  .map(this::addProfileInformation);
+{% endhighlight %}
+
+### Step 3: Write the output to an output stream:
+Finally, you can create an OutputStream using StreamGraph.getOutputStream and send the transformed messages through it.
+
+{% highlight java %}
+// Send messages with userId as the key to “decorated-page-views”.
+decoratedPageViews.sendTo(
+                          graph.getOutputStream(“decorated-page-views”,
+                                                dpv -> dpv.getUserId(),
+                                                dpv -> dpv));
+{% endhighlight %}
+
+The first parameter `decorated-page-views` is a logical stream ID. The properties for this stream ID can be overridden just like the stream IDs for input streams. For example:
+{% highlight jproperties %}
+streams.decorated-page-views.samza.system=kafka
+streams.decorated-page-views.samza.physical.name=DecoratedPageViewEvent
+{% endhighlight %}
+
+The second and third parameters define extractors to split the upstream data type into a separate key and value, respectively.
+
+## Operators
+The high level API supports common operators like map, flatmap, filter, merge, joins, and windowing on streams. Most of these operators accept corresponding Functions and these functions are [Initable](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/functions/InitableFunction.html).
+
+
+### Map
+Applies the provided 1:1 [MapFunction](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/functions/MapFunction.html) to each element in the MessageStream and returns the transformed MessageStream. The MapFunction takes in a single message and returns a single message (potentially of a different type).
+
+{% highlight java %}
+MessageStream<Integer> numbers = ...
+MessageStream<Integer> tripled= numbers.map(m -> m * 3)
+MessageStream<String> stringified = numbers.map(m -> String.valueOf(m))
+{% endhighlight %}
+
+### Flatmap
+Applies the provided 1:n [FlatMapFunction](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/functions/FlatMapFunction.html) to each element in the MessageStream and returns the transformed MessageStream. The FlatMapFunction takes in a single message and returns zero or more messages.
+
+{% highlight java %}
+MessageStream<String> sentence = ...
+// Parse the sentence into its individual words splitting by space
+MessageStream<String> words = sentence.flatMap(sentence ->
+                                                          Arrays.asList(sentence.split(“ ”))
+{% endhighlight %}
+
+### Filter
+Applies the provided [FilterFunction](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/functions/FilterFunction.html) to the MessageStream and returns the filtered MessageStream. The FilterFunction is a predicate that specifies whether a message should be retained in the filtered stream. Messages for which the FilterFunction returns false are filtered out.
+{% highlight java %}
+MessageStream<String> words = ...
+// Extract only the long words
+MessageStream<String> longWords = words.filter(word -> word.size() > 15);
+// Extract only the short words
+MessageStream<String> shortWords = words.filter(word -> word.size() < 3);
+{% endhighlight %}
+
+### PartitionBy
+Re-partitions this MessageStream using the key returned by the provided keyExtractor and returns the transformed MessageStream. Messages are sent through an intermediate stream during repartitioning.
+
+{% highlight java %}
+// Repartition pageView by userId
+MessageStream<PageView> pageViews = ...
+MessageStream<PageView> partitionedPageViews =
+                                        pageViews.partitionBy(pageView -> pageView.getUserId())
+{% endhighlight %}
+
+### Merge
+Merges the MessageStream with all the provided MessageStreams and returns the merged stream.
+
+{% highlight java %}
+MessageStream<ServiceCall> serviceCall1 = ...
+MessageStream<ServiceCall> serviceCall2 = ...
+// Merge individual “ServiceCall” streams and create a new merged MessageStream
+MessageStream<ServiceCall> serviceCallMerged = serviceCall1.merge(serviceCall2)
+{% endhighlight %}
+
+The merge transform preserves the order of each MessageStream, so if message `m1` appears before `m2` in any provided stream, then, `m1` also appears before `m2` in the merged stream.
+
+As an alternative to the `merge` instance method, you also can use the [MessageStream#mergeAll](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#mergeAll-java.util.Collection-) static method to merge MessageStreams without operating on an initial stream.
+
+### SendTo
+Sends all messages from this MessageStream to the provided OutputStream. You can specify the key and the value to be used for the outgoing message.
+
+{% highlight java %}
+// Output a new message with userId as the key and region as the value to the “user-region” stream.
+MessageStream<PageView> pageViews = ...
+OutputStream<String, String, PageView> userRegions
+                           = graph.getOutputStream(“user-region”,
+                                                   pageView -> pageView.getUserId(),
+                                                   pageView -> pageView.getRegion())
+pageView.sendTo(userRegions);
+{% endhighlight %}
+
+
+### Sink
+Allows sending messages from this MessageStream to an output system using the provided [SinkFunction](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/functions/SinkFunction.html).
+
+This offers more control than sendTo since the SinkFunction has access to the `MessageCollector` and the `TaskCoordinator`. For instance, you can choose to manually commit offsets, or shut-down the job using the TaskCoordinator APIs. This operator can also be used to send messages to non-Samza systems (e.g. remote databases, REST services, etc.)
+
+{% highlight java %}
+// Repartition pageView by userId.
+MessageStream<PageView> pageViews = ...
+pageViews.sink( (msg, collector, coordinator) -> {
+// Construct a new outgoing message, and send it to a kafka topic named TransformedPageViewEvent.
+ collector.send(new OutgoingMessageEnvelope(new SystemStream(“kafka”,
+                         “TransformedPageViewEvent”), msg));
+} )
+{% endhighlight %}
+
+### Join
+
+The Join operator joins messages from two MessageStreams using the provided pairwise [JoinFunction](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/functions/JoinFunction.html). Messages are joined when the keys extracted from messages from the first stream match keys extracted from messages in the second stream. Messages in each stream are retained for the provided ttl duration and join results are emitted as matches are found.
+
+{% highlight java %}
+// Joins a stream of OrderRecord with a stream of ShipmentRecord by orderId with a TTL of 20 minutes.
+// Results are produced to a new stream of FulfilledOrderRecord.
+MessageStream<OrderRecord> orders = …
+MessageStream<ShipmentRecord> shipments = …
+
+MessageStream<FulfilledOrderRecord> shippedOrders = orders.join(shipments, new OrderShipmentJoiner(), Duration.ofMinutes(20) )
+
+// Constructs a new FulfilledOrderRecord by extracting the order timestamp from the OrderRecord and the shipment timestamp from the ShipmentRecord.
+ class OrderShipmentJoiner implements JoinFunction<String, OrderRecord, ShipmentRecord, FulfilledOrderRecord> {
+   @Override
+   public FulfilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
+     return new FulfilledOrderRecord(message.orderId, message.orderTimestamp, otherMessage.shipTimestamp);
+   }
+
+   @Override
+   public String getFirstKey(OrderRecord message) {
+     return message.orderId;
+   }
+
+   @Override
+   public String getSecondKey(ShipmentRecord message) {
+     return message.orderId;
+   }
+ }
+
+{% endhighlight %}
+
+
+### Window
+
+#### Windowing Concepts
+**Windows, Triggers, and WindowPanes**: The window operator groups incoming messages in the MessageStream into finite windows. Each emitted result contains one or more messages in the window and is called a WindowPane.
+
+A window can have one or more associated triggers which determine when results from the window are emitted. Triggers can be either [early triggers](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/windows/Window.html#setEarlyTrigger-org.apache.samza.operators.triggers.Trigger-) that allow emitting results speculatively before all data for the window has arrived, or late triggers that allow handling late messages for the window.
+
+**Aggregator Function**: By default, the emitted WindowPane will contain all the messages for the window. Instead of retaining all messages, you typically define a more compact data structure for the WindowPane and update it incrementally as new messages arrive, e.g. for keeping a count of messages in the window. To do this, you can provide an aggregating [FoldLeftFunction](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/functions/FoldLeftFunction.html) which is invoked for each incoming message added to the window and defines how to update the WindowPane for that message.
+
+**Accumulation Mode**: A window’s accumulation mode determines how results emitted from a window relate to previously emitted results for the same window. This is particularly useful when the window is configured with early or late triggers. The accumulation mode can either be discarding or accumulating.
+
+A *discarding window* clears all state for the window at every emission. Each emission will only correspond to new messages that arrived since the previous emission for the window.
+
+An *accumulating window* retains window results from previous emissions. Each emission will contain all messages that arrived since the beginning of the window.
+
+#### Window Types:
+The Samza high-level API currently supports tumbling and session windows.
+
+**Tumbling Window**: A tumbling window defines a series of contiguous, fixed size time intervals in the stream.
+
+Examples:
+{% highlight java %}
+
+// Group the pageView stream into 3 second tumbling windows keyed by the userId.
+MessageStream<PageView> pageViews = ...
+MessageStream<WindowPane<String, Collection<PageView>>> =
+                     pageViews.window(
+                         Windows.keyedTumblingWindow(pageView -> pageView.getUserId(),
+                           Duration.ofSeconds(30)))
+
+
+// Compute the maximum value over tumbling windows of 3 seconds.
+MessageStream<Integer> integers = …
+Supplier<Integer> initialValue = () -> Integer.MIN_VALUE
+FoldLeftFunction<Integer, Integer> aggregateFunction = (msg, oldValue) -> Math.max(msg, oldValue)
+MessageStream<WindowPane<Void, Integer>> windowedStream =
+         integers.window(Windows.tumblingWindow(Duration.ofSeconds(30), initialValue, aggregateFunction))
+
+{% endhighlight %}
+
+
+**Session Window**: A session window groups a MessageStream into sessions. A session captures a period of activity over a MessageStream and is defined by a gap. A session is closed and results are emitted if no new messages arrive for the window for the gap duration.
+
+Examples:
+{% highlight java %}
+
+// Sessionize a stream of page views, and count the number of page-views in a session for every user.
+MessageStream<PageView> pageViews = …
+Supplier<Integer> initialValue = () -> 0
+FoldLeftFunction<PageView, Integer> countAggregator = (pageView, oldCount) -> oldCount + 1;
+Duration sessionGap = Duration.ofMinutes(3);
+MessageStream<WindowPane<String, Integer> sessionCounts = pageViews.window(Windows.keyedSessionWindow(
+    pageView -> pageView.getUserId(), sessionGap, initialValue, countAggregator));
+
+// Compute the maximum value over tumbling windows of 3 seconds.
+MessageStream<Integer> integers = …
+Supplier<Integer> initialValue = () -> Integer.MAX_INT
+
+FoldLeftFunction<Integer, Integer> aggregateFunction = (msg, oldValue) -> Math.max(msg, oldValue)
+MessageStream<WindowPane<Void, Integer>> windowedStream =
+     integers.window(Windows.tumblingWindow(Duration.ofSeconds(3), initialValue, aggregateFunction))
+
+{% endhighlight %}
+
+### Known Issues
+Currently, both window and join operators buffer messages in-memory. So, messages could be lost on failures and re-starts.
+
+---
+
+## Flexible Deployment Model
+
+### Introduction
+Prior to Samza 0.13.0, Samza only supported cluster-managed deployment with [YARN](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html).
+
+With Samza 0.13.0, the deployment model has been simplified and decoupled from YARN. If you prefer cluster management, you can still use YARN or you can implement your own extension to deploy Samza on other cluster management systems. But what if you want to avoid cluster management systems altogether?
+
+Samza now ships with the ability to deploy applications as a simple embedded library with pluggable coordination. With embedded mode, you can leverage Samza processors directly in your application and deploy it in whatever way you prefer. Samza has a pluggable job coordinator layer to perform leader election and assign work to the processors.
+
+This section will focus on the new embedded deployment capability.
+
+### Concepts
+Let’s take a closer look at how embedded deployment works.
+
+The [architecture](#architecture) section above provided an overview of the layers that enable the flexible deployment model. The new embedded mode comes into the picture at the *deployment* layer. The deployment layer includes assignment of input partitions to the available processors.
+
+There are two types of partition assignment models which are controlled with the *job.coordinator.factory* in configuration:
+
+#### External Partition Management
+With external partition management, Samza doesn’t manage the partitioning by itself. Instead it uses a `PassthroughJobCoordinator` which honors whatever partition mapping is provided by the [SystemStreamPartitionGrouper](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.html). There are two common patterns for external partition management:
+
+* **Using high level Kafka consumer** - partition assignment is done by the high level Kafka consumer itself. To use this model, you need to implement and configure a SystemFactory which provides the Kafka high level consumer. Then you need to configure *job.systemstreampartition.grouper.factory* to *org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouper* so Kafka's partition assignments all go to one task.
+* **Customized partitioning** - partition assignment is done with a custom grouper. The grouper logic is completely up to you. A practical example of this model is to implement a custom grouper which reads static partition assignments from the configuration.
+
+Samza ships with a `PassthroughJobCoordinatorFactory` which facilitates this type of partition management.
+
+#### Dynamic Partition Management
+
+With dynamic partitioning, partitions are distributed between the available processors at runtime. If the number of available processors changes (for example, if some are shut down or added) the mapping will be regenerated and re-distributed to all the processors.
+Information about current mapping is contained in a special structure called the JobModel.
+There is one leader processor which generates the JobModel and distributes it to the other processors. The leader is determined by a “leader election” process.
+
+Let’s take a closer look at how dynamic coordination works.
+
+#### Coordination service
+
+Dynamic coordination of the processors assumes presence of a coordination service. The main responsibilities of the service are:
+
+* **Leader Election** - electing a single processor, which will be responsible for JobModel calculation and distribution or for intermediate streams creation.
+* **Central barrier and latch** - coordination primitives used by the processors.
+* **JobModel notifications** - notifying the processors about availability of a new JobModel.
+* **JobModel storage** the coordination service dictates where the JobModel is persisted.
+
+The coordination service is currently derived from the job coordinator factory. Samza ships with a `ZkJobCoordinatorFactory` implementation which has a corresponding `ZkCoordinationServiceFactory`.
+
+Let’s walk through the coordination sequence for a ZooKeeper based embedded application:
+
+* Each processor (participant) will register with the pluggable coordination service. During the registration it will provide its own participant ID.
+* One of the participants will be elected as the leader.
+* The leader monitors the list of all the active participants.
+* Whenever the list of the participants changes, the leader will generate a new JobModel for the current participants.
+* The new JobModel will be pushed to a common storage. The default implementation uses ZooKeeper for this purpose.
+* Participants are notified that the new JobModel is available. Notification is done through the coordination services, e.g. ZooKeeper.
+* The participants will stop processing, apply the new JobModel, and then resume processing.
+
+The following diagram shows the relationships of the coordinators in the ZooKeeper coordination service implementation.
+
+<img src="/img/{{site.version}}/learn/documentation/introduction/coordination-service.png" alt="Coordination service diagram" style="max-width: 100%; height: auto;" onclick="window.open(this.src)">
+
+Here are a few important details about the coordination service:
+
+* In order to ensure that no two partitions are processed twice by different processors, processing is paused and the processors synchronize on a barrier. Once all the processors are paused, the new JobModel is applied and the processing resumes. The barrier is implemented using the coordination service.
+* During startup and shutdown the processors will be joining/leaving one after another. To avoid redundant JobModel re-calculation, there is a debounce timer which waits for some short period of time (2 seconds by default, configurable in a future release) for more processors to join or leave. Each time a processor joins or leaves, the timer is reset. When the timer expires the JobModel is finally recalculated.
+* If the processors require local store for adjacent or temporary data, we would want to keep its mapping across restarts. For this we uses some extra information about each processor, which uniquely identifies it and its location. If the same processor is restarted on the same location we will try to assign it the same partitions. This locality information should survive the restarts, so it is stored on a common storage (currently using ZooKeeper).
+
+### User guide
+Embedded deployment is designed to help users who want more control over the deployment of their application. So it is the user's responsibility to configure and deploy the processors. In case of ZooKeeper coordination, you also need to configure the URL for an instance of ZooKeeper.
+
+Additionally, each processor requires a unique ID to be used with the coordination service. If location affinity is important, this ID should be unique for each processor on a specific hostname (assuming local Storage services). To address this requirement, Samza uses a [ProcessorIdGenerator](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/runtime/ProcessorIdGenerator.html) to provide the ID for each processor. If no generator is explicitly configured, the default one will create a UUID for each processor.
+
+#### Configuration
+To run an embedded Samza processor, you need to configure the coordinator service using the *job.coordinator.factory* property. Also, there is currently one taskname grouper that supports embedded mode, so you must configure that explicitly.
+
+Let’s take a look at how to configure the two coordination service implementations that ship with Samza.
+
+To use ZooKeeper-based coordination, the following configs are required:
+
+{% highlight jproperties %}
+job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
+job.coordinator.zk.connect=yourzkconnection
+task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
+{% endhighlight %}
+
+To use external coordination, the following configs are needed:
+
+{% highlight properties %}
+job.coordinator.factory=org.apache.samza.standalone.PassthroughJobCoordinatorFactory
+task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
+{% endhighlight %}
+
+#### API
+As mentioned in the [architecture](#architecture) section above, you use the LocalApplicationRunner to launch your processors from your application code, like this:
+
+{% highlight java %}
+public class WikipediaZkLocalApplication {
+
+ public static void main(String[] args) {
+   CommandLine cmdLine = new CommandLine();
+   OptionSet options = cmdLine.parser().parse(args);
+   Config config = cmdLine.loadConfig(options);
+
+   LocalApplicationRunner runner = new LocalApplicationRunner(config);
+   WikipediaApplication app = new WikipediaApplication();
+
+   runner.run(app);
+   runner.waitForFinish();
+ }
+}
+{% endhighlight %}
+
+In the code above, `WikipediaApplication` is an application written with the [high level API](#high-level-api).
+
+Check out the [tutorial](/learn/tutorials/{{site.version}}/hello-samza-high-level-zk.html) to run this application with ZooKeeper coordination on your machine now.
+
+#### Deployment and Scaling
+You can deploy the application instances in any way you prefer. If using the coordination service, you can add or remove instances at any time and the leader’s job coordinator (elected via the CoordinationService) will automatically recalculate the JobModel after the debounce time and apply it to the available processors. So, to scale up your application, you simply start more processors.
+
+### Known issues
+Take note of the following issues with the embedded deployment feature for the 0.13.0 release. They will be fixed in a subsequent release.
+
+* The GroupByContainerCount default taskname grouper isn’t supported.
+* Host affinity is not enabled.
+* ZkJobCoordinator metrics are not provided yet. Metrics will soon be added for
+    * Number of JobModel recalculations
+    * Number of Read/Writes
+    * Leader reelections
+    * more..
+* The LocalApplicationRunner does not yet support the low level API. This means you cannot use StreamTask with LocalApplicationRunner.
\ No newline at end of file
index 8cf4923..0477854 100644 (file)
@@ -83,7 +83,7 @@ public class KafkaStreamSpec extends StreamSpec {
         if (LogConfig.configNames().contains(entry.getKey())) {
           filteredConfig.put(entry.getKey(), entry.getValue());
         } else {
-          LOG.warn("Property '{}' is not a valid Kafka topic config. It will be ignored.");
+          LOG.warn("Property '{}' is not a valid Kafka topic config. It will be ignored.", entry.getKey());
         }
       }
     }