Documentation fixes for Samza SQL
authorShenoda Guirguis <sguirguis@linkedin.com>
Wed, 2 Jan 2019 21:33:24 +0000 (13:33 -0800)
committerAditya Toomula <atoomula@linkedin.com>
Wed, 2 Jan 2019 21:33:24 +0000 (13:33 -0800)
Author: Shenoda Guirguis <sguirguis@linkedin.com>

Reviewers: atoomula,vjagadish1989

Closes #865 from shenodaguirguis/docfix

docs/learn/documentation/versioned/api/samza-sql.md
docs/startup/code-examples/versioned/index.md
docs/startup/quick-start/versioned/index.md
docs/startup/quick-start/versioned/samza-sql.md [new file with mode: 0644]
docs/startup/quick-start/versioned/samza.md [new file with mode: 0644]

index 0b488b3..01a4283 100644 (file)
@@ -19,65 +19,70 @@ title: Samza SQL
    limitations under the License.
 -->
 
+### Table Of Contents
+- [Introduction](#introduction)
+- [Code Examples](#code-examples)
+- [Key Concepts](#key-concepts)
+  - [SQL Representation](#sql-representation)
+  - [SQL Grammar](#sql-grammar)
+  - [UDFs](#UDFs)
+  - [UDF Polymorphism](#udf-polymorphism)
+- [Known Limitations](#Known-Limitations)
 
-### Overview
-Samza SQL allows you to define your stream processing logic declaratively as a a SQL query.
-This allows you to create streaming pipelines without Java code or configuration unless you require user-defined functions (UDFs). Support for SQL internally uses Apache Calcite, which provides SQL parsing and query planning. The query is automatically translated to Samza's high level API and runs on Samza's execution engine.
+### Introduction
+Samza SQL allows you to define your stream processing logic 
+declaratively as a a SQL query. This allows you to create streaming 
+pipelines without Java code or configuration unless you require 
+user-defined functions (UDFs). 
 
-You can run Samza SQL locally on your machine or on a YARN cluster.
+### Code Examples
 
-### Running Samza SQL on your local machine
-The [Samza SQL console](https://samza.apache.org/learn/tutorials/0.14/samza-tools.html) allows you to experiment with Samza SQL locally on your machine. 
+The [Hello Samza](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/sql) 
+SQL examples demonstrate how to use Samza SQL API.  
 
-#### Setup Kafka
-Follow the instructions from the [Kafka quickstart](http://kafka.apache.org/quickstart) to start the zookeeper and Kafka server.
+- The [filter](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/sql/samza-sql-filter) 
+demonstrates filtering and insert Samza SQL job.
 
-Let us create a Kafka topic named “ProfileChangeStream” for this demo.
+- The [Case-When](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/sql/samza-sql-casewhen/src/main/sql)
+shows how to use ```CASE WHEN``` statement, along with ```UDF``` to identify qualifying events.
 
-```bash
->./deploy/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ProfileChangeStream
-```
-
-Download the Samza tools package from [here](https://samza.apache.org/learn/tutorials/0.14/samza-tools.html) and use the `generate-kafka-events` script populate the stream with sample data.
+- The [join](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/sql/samza-sql-stream-table-join)
+demonstrates how to peform a stream-table join. Please note that join operation is currently not 
+fully cooked, and we are actively working on stabilizing it. 
 
-```bash
-> cd samza-tools-<version>
-> ./scripts/generate-kafka-events.sh -t ProfileChangeStream -e ProfileChange
-```
+- The [group by](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/sql/samza-sql-groupby)
+show how to do group by. Similar to Join, Group By is being actively stabilized. 
 
-#### Using the Samza SQL Console
 
+### Key Concepts
 
-The simplest SQL query is to read all events from a Kafka topic `ProfileChangeStream` and print them to the console.
+Each Samza SQL job consists of one or more Samza SQL statements.
+Each statement represents a single streaming pipeline.
 
-```bash
-> ./scripts/samza-sql-console.sh --sql "insert into log.consoleoutput select * from kafka.ProfileChangeStream"
-```
+#### SQL Representation
 
-Next, let us project a few fields from the input stream.
+Support for SQL internally uses Apache Calcite, which provides 
+SQL parsing and query planning. The query is automatically translated 
+to Samza's high level API and runs on Samza's execution engine.
 
-```bash
-> ./scripts/samza-sql-console.sh --sql "insert into log.consoleoutput select Name, OldCompany, NewCompany from kafka.ProfileChangeStream"
-```
+The mapping from SQL to the Samza's high level API is a simple 
+deterministic one-to-one mapping. For example, ```Select```, i.e., 
+projections, maps to a filter operation, while ```from``` maps to 
+a scan(s) and join(s) - if selecting from multiple streams and tables
+- operators, and so on.  
 
-You can also filter messages in the input stream based on some predicate. In this example, we filter profiles currently working at LinkedIn, whose previous employer matches the regex `.*soft`. The function `RegexMatch(regex, company)` is an example of 
-a UDF that defines a predicate. 
+The table below lists the supported SQL operations.
 
-```bash
-> ./scripts/samza-sql-console.sh --sql "insert into log.consoleoutput select Name as __key__, Name, NewCompany, RegexMatch('.*soft', OldCompany) from kafka.ProfileChangeStream where NewCompany = 'LinkedIn'"
-```
-
-
-### Running Samza SQL on YARN
-The [hello-samza](https://github.com/apache/samza-hello-samza) project has examples to get started with Samza on YARN. You can define your SQL query in a [configuration file](https://github.com/apache/samza-hello-samza/blob/master/src/main/config/pageview-filter-sql.properties) and submit it to a YARN cluster.
-
-
-```bash
-> ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/page-view-filter-sql.properties
-```
+ Operation | Syntax hints | Comments      
+ --- | --- | --- 
+ PROJECTION | SELECT/INSERT/UPSERT | See [SQL Grammar](#sql-grammer) below 
+ FILTERING | WHERE expression |See [SQL Grammar](#sql-grammer) below 
+ UDFs | udf_name(args)    | In both SELECT and WHERE clause 
+ JOIN | [LEFT/RIGHT] JOIN .. ON .. | Stream-table inner, left- or right-outer joins. Currently not fully stable. 
+ AGGREGATION | COUNT ( ...) .. GROUP BY | Currently only COUNT is supported, using processing-time based window. 
 
 
-### SQL Grammar
+#### SQL Grammar
 
 Samza SQL's grammar is a subset of capabilities supported by Calcite's SQL parser.
 
@@ -127,7 +132,37 @@ values:
   VALUES expression [, expression ]*
 
 ```
+
+#### UDFs
+
+In addition to existing SQL logical operations, Samza SQL 
+allows the user to extend its functionality by running 
+user-code through User Defined Functions (UDFs) as part of the Stream processing 
+pipeline corresponding to the SQL.
+
+
+#### UDF Polymorphism 
+
+Since UDF's execute method takes an array of generic objects as
+parameter, Samza SQL UDF framework is flexible enough to 
+support polymorphic udf functions with varying sets of arguments 
+as long as UDF implementations support them.
+
+For example in the below sql statement, UDF will be passed an 
+object array of size 2 with first element containing id of type  
+"LONG" and second element name of type "String". The type of the 
+objects that are passed depends on the type of those fields in Samza 
+SQL message format.
+
+{% highlight sql %}
+select myudf(id, name) from identity.profile
+{% endhighlight %}
+
+
 ### Known Limitations
 
-Samza SQL only supports simple stateless queries including selections and projections. We are actively working on supporting stateful operations such as aggregations, windows and joins.
+Samza SQL only supports simple stateless queries including selections
+and projections. We are actively working on supporting stateful operations 
+such as aggregations, windows and joins.
+
 
index 96a78af..80e2ed7 100644 (file)
@@ -48,6 +48,16 @@ In addition to the cookbook, you can also consult these:
 
 - [Amazon Kinesis](https://github.com/apache/samza-hello-samza/tree/master/src/main/java/samza/examples/kinesis) and [Azure Eventhubs](https://github.com/apache/samza-hello-samza/tree/latest/src/main/java/samza/examples/azure) examples that cover how to consume input data from the respective systems.
 
+#### Low-level API examples
+The [Wikipedia Parser (low-level API)](https://github.com/apache/samza-hello-samza/tree/latest/src/main/java/samza/examples/wikipedia/task/application): 
+Same example that builds a streaming pipeline consuming a live-feed of 
+wikipedia edits, parsing each message and generating statistics from them, but
+using low-level APIs. 
+
+#### Samza SQL API examples
+You can easily create a Samza job declaratively using 
+[Samza SQL](https://samza.apache.org/learn/tutorials/0.14/samza-sql.html).
+
 #### Apache Beam API examples
 
 The easiest way to get a copy of the WordCount examples in Beam API is to use [Apache Maven](http://maven.apache.org/download.cgi). After installing Maven, please run the following command:
@@ -110,4 +120,4 @@ OF: 1
 ...
 {% endhighlight %}
 
-A walkthrough of the example code can be found [here](https://beam.apache.org/get-started/wordcount-example/). Feel free to play with other examples in the project or write your own. Please don't hesitate to [reach out](https://samza.apache.org/community/contact-us.html) if you encounter any issues.
\ No newline at end of file
+A walkthrough of the example code can be found [here](https://beam.apache.org/get-started/wordcount-example/). Feel free to play with other examples in the project or write your own. Please don't hesitate to [reach out](https://samza.apache.org/community/contact-us.html) if you encounter any issues.
index 30add8a..b13420b 100644 (file)
@@ -19,227 +19,5 @@ title: Quick Start
    limitations under the License.
 -->
 
-In this tutorial, we will create our first Samza application - `WordCount`. This application will consume messages from a Kafka stream, tokenize them into individual words and count the frequency of each word.  Let us download the entire project from [here](https://github.com/apache/samza-hello-samza/blob/latest/quickstart/wordcount.tar.gz).
-
-### Setting up a Java Project
-
-Observe the project structure as follows:
-
-{% highlight bash %}
-wordcount
-|-- build.gradle
-|-- gradle.properties
-|-- scripts
-|-- src
-    |-- main
-        |-- config
-        |-- java
-            |-- samzaapp
-                 |-- WordCount.java
-{% endhighlight %}
-
-You can build the project anytime by running:
-
-{% highlight bash %}
-> cd wordcount
-> gradle wrapper --gradle-version 4.9
-> ./gradlew build
-{% endhighlight %}
-
-### Create a Samza StreamApplication
-
-Now let’s write some code! An application written using Samza's [high-level API](/learn/documentation/{{site.version}}/api/api/high-level-api.html) implements the [StreamApplication](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html) interface:
-
-{% highlight java %}
-package samzaapp;
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-
-public class WordCount implements StreamApplication {
- @Override
- public void describe(StreamApplicationDescriptor streamApplicationDescriptor) {
- }
-}
-{% endhighlight %}
-
-The interface provides a single method named `describe()`, which allows us to define our inputs, the processing logic and outputs for our application. 
-
-### Describe your inputs and outputs
-
-To interact with Kafka, we will first create a `KafkaSystemDescriptor` by providing the coordinates of the Kafka cluster. For each Kafka topic our application reads from, we create a `KafkaInputDescriptor` with the name of the topic and a serializer. Likewise, for each output topic, we instantiate a corresponding `KafkaOutputDescriptor`. 
-
-{% highlight java %}
-public class WordCount implements StreamApplication {
- private static final String KAFKA_SYSTEM_NAME = "kafka";
- private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181");
- private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092");
- private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1");
-
- private static final String INPUT_STREAM_ID = "sample-text";
- private static final String OUTPUT_STREAM_ID = "word-count-output";
-
- @Override
- public void describe(StreamApplicationDescriptor streamApplicationDescriptor) {
-   // Create a KafkaSystemDescriptor providing properties of the cluster
-   KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME)
-       .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
-       .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
-       .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
-
-   // For each input or output stream, create a KafkaInput/Output descriptor
-   KafkaInputDescriptor<KV<String, String>> inputDescriptor =
-       kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID,
-           KVSerde.of(new StringSerde(), new StringSerde()));
-   KafkaOutputDescriptor<KV<String, String>> outputDescriptor =
-       kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID,
-           KVSerde.of(new StringSerde(), new StringSerde()));
-
-   // Obtain a handle to a MessageStream that you can chain operations on
-   MessageStream<KV<String, String>> lines = streamApplicationDescriptor.getInputStream(inputDescriptor);
-   OutputStream<KV<String, String>> counts = streamApplicationDescriptor.getOutputStream(outputDescriptor);
- }
-}
-{% endhighlight %}
-
-The above example creates a [MessageStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html) which reads from an input topic named `sample-text`. It also defines an output stream that emits results to a topic named `word-count-output`. Next let’s add our processing logic. 
-
-### Add word count processing logic
-
-Kafka messages typically have a key and a value. Since we only care about the value here, we will apply the `map` operator on the input stream to extract the value. 
-
-{% highlight java %}
-lines.map(kv -> kv.value)
-{% endhighlight %}
-
-Next, we will tokenize the message into individual words using the `flatmap` operator.
-
-{% highlight java %}
-.flatMap(s -> Arrays.asList(s.split("\\W+")))
-{% endhighlight %}
-
-
-We now need to group the words, aggregate their respective counts and periodically emit our results. For this, we will use Samza's session-windowing feature.
-
-{% highlight java %}
-.window(Windows.keyedSessionWindow(
-   w -> w, Duration.ofSeconds(5), () -> 0, (m, prevCount) -> prevCount + 1,
-   new StringSerde(), new IntegerSerde()), "count")
-{% endhighlight %}
-
-Let's walk through each of the parameters to the above `window` function:
-The first parameter is a "key function", which defines the key to group messages by. In our case, we can simply use the word as the key. The second parameter is the windowing interval, which is set to 5 seconds. The third parameter is a function which provides the initial value for our aggregations. We can start with an initial count of zero for each word. The fourth parameter is an aggregation function for computing counts. The next two parameters specify the key and value serializers for our window. 
-
-The output from the window operator is captured in a [WindowPane](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/windows/WindowPane.html) type, which contains the word as the key and its count as the value. We add a further `map` to format this into a `KV`, that we can send to our Kafka topic. To write our results to the output topic, we use the `sendTo` operator in Samza.
-
-
-{% highlight java %}
-.map(windowPane ->
-   KV.of(windowPane.getKey().getKey(),
-       windowPane.getKey().getKey() + ": " + windowPane.getMessage().toString()))
-.sendTo(counts);
-{% endhighlight %}
-
-The full processing logic looks like the following:
-
-{% highlight java %}
-lines
-   .map(kv -> kv.value)
-   .flatMap(s -> Arrays.asList(s.split("\\W+")))
-   .window(Windows.keyedSessionWindow(
-       w -> w, Duration.ofSeconds(5), () -> 0, (m, prevCount) -> prevCount + 1,
-       new StringSerde(), new IntegerSerde()), "count")
-   .map(windowPane ->
-       KV.of(windowPane.getKey().getKey(),
-           windowPane.getKey().getKey() + ": " + windowPane.getMessage().toString()))
-   .sendTo(counts);
-{% endhighlight %}
-
-
-### Configure your application
-
-In this section, we will configure our word count example to run locally in a single JVM. Let us add a file named “word-count.properties” under the config folder. 
-
-{% highlight jproperties %}
-job.name=word-count
-# Use a PassthroughJobCoordinator since there is no coordination needed
-job.coordinator.factory=org.apache.samza.standalone.PassthroughJobCoordinatorFactory
-job.coordination.utils.factory=org.apache.samza.standalone.PassthroughCoordinationUtilsFactory
-
-job.changelog.system=kafka
-
-# Use a single container to process all of the data
-task.name.grouper.factory=org.apache.samza.container.grouper.task.SingleContainerGrouperFactory
-processor.id=0
-
-# Read from the beginning of the topic
-systems.kafka.default.stream.samza.offset.default=oldest
-{% endhighlight %}
-
-For more details on Samza's configs, feel free to check out the latest [configuration reference](/learn/documentation/{{site.version}}/jobs/configuration-table.html).
-
-### Run your application
-
-We are ready to add a `main()` function to the `WordCount` class. It parses the command-line arguments and instantiates a `LocalApplicationRunner` to execute the application locally.
-
-{% highlight java %}
-public static void main(String[] args) {
- CommandLine cmdLine = new CommandLine();
- OptionSet options = cmdLine.parser().parse(args);
- Config config = cmdLine.loadConfig(options);
- LocalApplicationRunner runner = new LocalApplicationRunner(new WordCount(), config);
- runner.run();
- runner.waitForFinish();
-}
-{% endhighlight %}
-
-
-Before running `main()`, we will create our input Kafka topic and populate it with sample data. You can download the scripts to interact with Kafka along with the sample data from [here](https://github.com/apache/samza-hello-samza/blob/latest/quickstart/wordcount.tar.gz).
-
-{% highlight bash %}
-> ./scripts/grid install zookeeper && ./scripts/grid start zookeeper
-> ./scripts/grid install kafka && ./scripts/grid start kafka
-{% endhighlight %}
-
-
-{% highlight bash %}
-> ./deploy/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic sample-text --partition 1 --replication-factor 1
-> ./deploy/kafka/bin/kafka-console-producer.sh --topic sample-text --broker localhost:9092 < ./sample-text.txt
-{% endhighlight %}
-
-Let’s kick off our application and use gradle to run it. Alternately, you can also run it directly from your IDE, with the same program arguments.
-
-{% highlight bash %}
-> export BASE_DIR=`pwd`
-> ./gradlew run --args="--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$BASE_DIR/src/main/config/word-count.properties"
-{% endhighlight %}
-
-
-The application will output to a Kafka topic named "word-count-output". We will now fire up a Kafka consumer to read from this topic:
-
-{% highlight bash %}
->  ./deploy/kafka/bin/kafka-console-consumer.sh --topic word-count-output --zookeeper localhost:2181 --from-beginning
-{% endhighlight %}
-
-It will show the counts for each word like the following:
-
-{% highlight bash %}
-well: 4
-complaining: 1
-die: 3
-but: 22
-not: 50
-truly: 5
-murmuring: 1
-satisfied: 3
-the: 120
-thy: 8
-gods: 8
-thankful: 1
-and: 243
-from: 16
-{% endhighlight %}
-
-Congratulations! You've successfully run your first Samza application.
-
-### [More Examples >>](/startup/code-examples/{{site.version}})
\ No newline at end of file
+[Samza Tutorial](samza.md)
+[Samza SQL Tutorial](samza-sql.md)
\ No newline at end of file
diff --git a/docs/startup/quick-start/versioned/samza-sql.md b/docs/startup/quick-start/versioned/samza-sql.md
new file mode 100644 (file)
index 0000000..847fb47
--- /dev/null
@@ -0,0 +1,119 @@
+---
+layout: page
+title: Samza SQL Quick Start
+---
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+
+### Overview
+Samza SQL allows you to define your stream processing logic declaratively as a a SQL query.
+This allows you to create streaming pipelines without Java code or configuration unless you 
+require user-defined functions ([UDF](#How-to-write-a-UDF?)). 
+
+You can run Samza SQL locally on your machine or on a YARN cluster.
+
+### Running Samza SQL on your local machine
+The [Samza SQL console](https://samza.apache.org/learn/tutorials/0.14/samza-tools.html) allows you to experiment with Samza SQL locally on your machine. 
+
+#### Setup Kafka
+Follow the instructions from the [Kafka quickstart](http://kafka.apache.org/quickstart) to start the zookeeper and Kafka server.
+
+Let us create a Kafka topic named “ProfileChangeStream” for this demo.
+
+```bash
+./deploy/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ProfileChangeStream
+```
+
+Download the Samza tools package from [here](https://samza.apache.org/learn/tutorials/0.14/samza-tools.html) and use the `generate-kafka-events` script populate the stream with sample data.
+
+```bash
+cd samza-tools-<version>
+./scripts/generate-kafka-events.sh -t ProfileChangeStream -e ProfileChange
+```
+
+#### Using the Samza SQL Console
+
+
+The simplest SQL query is to read all events from a Kafka topic `ProfileChangeStream` and print them to the console.
+
+```bash
+./scripts/samza-sql-console.sh --sql "insert into log.consoleoutput select * from kafka.ProfileChangeStream"
+```
+
+Next, let us project a few fields from the input stream.
+
+```bash
+./scripts/samza-sql-console.sh --sql "insert into log.consoleoutput select Name, OldCompany, NewCompany from kafka.ProfileChangeStream"
+```
+
+You can also filter messages in the input stream based on some predicate. In this example, we filter profiles currently working at LinkedIn, whose previous employer matches the regex `.*soft`. The function `RegexMatch(regex, company)` is an example of 
+a UDF that defines a predicate. 
+
+```bash
+./scripts/samza-sql-console.sh --sql "insert into log.consoleoutput select Name as __key__, Name, NewCompany, RegexMatch('.*soft', OldCompany) from kafka.ProfileChangeStream where NewCompany = 'LinkedIn'"
+```
+
+
+### Running Samza SQL on YARN
+The [hello-samza](https://github.com/apache/samza-hello-samza) project has examples to 
+get started with Samza on YARN. You can define your SQL query in a 
+configuration file and submit it to a YARN cluster.
+
+
+```bash
+./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/page-view-filter-sql.properties
+```
+
+ ### How to write a UDF?
+ Right now Samza SQL support Scalar UDFs which means that each 
+ UDF should act on each record at a time and return the result 
+ corresponding to the record. In essence it exhibits the behavior
+  of 1 output to an input. Users need to implement the following 
+  interface to create a UDF.
+  
+{% highlight java %}
+
+ /**
+  * The base class for the Scalar UDFs. All the scalar UDF classes needs to extend this and implement a method named
+  * "execute". The number of arguments for the execute method in the UDF class should match the number of fields
+  * used while invoking this UDF in SQL statement.
+  * Say for e.g. User creates a UDF class with signature int execute(int var1, String var2). It can be used in a SQL query
+  *     select myudf(id, name) from profile
+  * In the above query, Profile should contain fields named 'id' of INTEGER/NUMBER type and 'name' of type VARCHAR/CHARACTER
+  */
+ public interface ScalarUdf {
+   /**
+    * Udfs can implement this method to perform any initialization that they may need.
+    * @param udfConfig Config specific to the udf.
+    */
+   void init(Config udfConfig);
+  
+   /**
+    * Actual implementation of the udf function
+    * @param args
+    *   list of all arguments that the udf needs
+    * @return
+    *   Return value from the scalar udf.
+    */
+   Object execute(Object... args);
+ }
+{% endhighlight %}
+
diff --git a/docs/startup/quick-start/versioned/samza.md b/docs/startup/quick-start/versioned/samza.md
new file mode 100644 (file)
index 0000000..0cf6bd4
--- /dev/null
@@ -0,0 +1,245 @@
+---
+layout: page
+title: Samza Quick Start
+---
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+In this tutorial, we will create our first Samza application - `WordCount`. This application will consume messages from a Kafka stream, tokenize them into individual words and count the frequency of each word.  Let us download the entire project from [here](https://github.com/apache/samza-hello-samza/blob/latest/quickstart/wordcount.tar.gz).
+
+### Setting up a Java Project
+
+Observe the project structure as follows:
+
+{% highlight bash %}
+wordcount
+|-- build.gradle
+|-- gradle.properties
+|-- scripts
+|-- src
+    |-- main
+        |-- config
+        |-- java
+            |-- samzaapp
+                 |-- WordCount.java
+{% endhighlight %}
+
+You can build the project anytime by running:
+
+{% highlight bash %}
+> cd wordcount
+> gradle wrapper --gradle-version 4.9
+> ./gradlew build
+{% endhighlight %}
+
+### Create a Samza StreamApplication
+
+Now let’s write some code! An application written using Samza's [high-level API](/learn/documentation/{{site.version}}/api/api/high-level-api.html) implements the [StreamApplication](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html) interface:
+
+{% highlight java %}
+package samzaapp;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+
+public class WordCount implements StreamApplication {
+ @Override
+ public void describe(StreamApplicationDescriptor streamApplicationDescriptor) {
+ }
+}
+{% endhighlight %}
+
+The interface provides a single method named `describe()`, which allows us to define our inputs, the processing logic and outputs for our application. 
+
+### Describe your inputs and outputs
+
+To interact with Kafka, we will first create a `KafkaSystemDescriptor` by providing the coordinates of the Kafka cluster. For each Kafka topic our application reads from, we create a `KafkaInputDescriptor` with the name of the topic and a serializer. Likewise, for each output topic, we instantiate a corresponding `KafkaOutputDescriptor`. 
+
+{% highlight java %}
+public class WordCount implements StreamApplication {
+ private static final String KAFKA_SYSTEM_NAME = "kafka";
+ private static final List<String> KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181");
+ private static final List<String> KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092");
+ private static final Map<String, String> KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1");
+
+ private static final String INPUT_STREAM_ID = "sample-text";
+ private static final String OUTPUT_STREAM_ID = "word-count-output";
+
+ @Override
+ public void describe(StreamApplicationDescriptor streamApplicationDescriptor) {
+   // Create a KafkaSystemDescriptor providing properties of the cluster
+   KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(KAFKA_SYSTEM_NAME)
+       .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
+       .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
+       .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
+
+   // For each input or output stream, create a KafkaInput/Output descriptor
+   KafkaInputDescriptor<KV<String, String>> inputDescriptor =
+       kafkaSystemDescriptor.getInputDescriptor(INPUT_STREAM_ID,
+           KVSerde.of(new StringSerde(), new StringSerde()));
+   KafkaOutputDescriptor<KV<String, String>> outputDescriptor =
+       kafkaSystemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID,
+           KVSerde.of(new StringSerde(), new StringSerde()));
+
+   // Obtain a handle to a MessageStream that you can chain operations on
+   MessageStream<KV<String, String>> lines = streamApplicationDescriptor.getInputStream(inputDescriptor);
+   OutputStream<KV<String, String>> counts = streamApplicationDescriptor.getOutputStream(outputDescriptor);
+ }
+}
+{% endhighlight %}
+
+The above example creates a [MessageStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html) which reads from an input topic named `sample-text`. It also defines an output stream that emits results to a topic named `word-count-output`. Next let’s add our processing logic. 
+
+### Add word count processing logic
+
+Kafka messages typically have a key and a value. Since we only care about the value here, we will apply the `map` operator on the input stream to extract the value. 
+
+{% highlight java %}
+lines.map(kv -> kv.value)
+{% endhighlight %}
+
+Next, we will tokenize the message into individual words using the `flatmap` operator.
+
+{% highlight java %}
+.flatMap(s -> Arrays.asList(s.split("\\W+")))
+{% endhighlight %}
+
+
+We now need to group the words, aggregate their respective counts and periodically emit our results. For this, we will use Samza's session-windowing feature.
+
+{% highlight java %}
+.window(Windows.keyedSessionWindow(
+   w -> w, Duration.ofSeconds(5), () -> 0, (m, prevCount) -> prevCount + 1,
+   new StringSerde(), new IntegerSerde()), "count")
+{% endhighlight %}
+
+Let's walk through each of the parameters to the above `window` function:
+The first parameter is a "key function", which defines the key to group messages by. In our case, we can simply use the word as the key. The second parameter is the windowing interval, which is set to 5 seconds. The third parameter is a function which provides the initial value for our aggregations. We can start with an initial count of zero for each word. The fourth parameter is an aggregation function for computing counts. The next two parameters specify the key and value serializers for our window. 
+
+The output from the window operator is captured in a [WindowPane](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/windows/WindowPane.html) type, which contains the word as the key and its count as the value. We add a further `map` to format this into a `KV`, that we can send to our Kafka topic. To write our results to the output topic, we use the `sendTo` operator in Samza.
+
+
+{% highlight java %}
+.map(windowPane ->
+   KV.of(windowPane.getKey().getKey(),
+       windowPane.getKey().getKey() + ": " + windowPane.getMessage().toString()))
+.sendTo(counts);
+{% endhighlight %}
+
+The full processing logic looks like the following:
+
+{% highlight java %}
+lines
+   .map(kv -> kv.value)
+   .flatMap(s -> Arrays.asList(s.split("\\W+")))
+   .window(Windows.keyedSessionWindow(
+       w -> w, Duration.ofSeconds(5), () -> 0, (m, prevCount) -> prevCount + 1,
+       new StringSerde(), new IntegerSerde()), "count")
+   .map(windowPane ->
+       KV.of(windowPane.getKey().getKey(),
+           windowPane.getKey().getKey() + ": " + windowPane.getMessage().toString()))
+   .sendTo(counts);
+{% endhighlight %}
+
+
+### Configure your application
+
+In this section, we will configure our word count example to run locally in a single JVM. Let us add a file named “word-count.properties” under the config folder. 
+
+{% highlight jproperties %}
+job.name=word-count
+# Use a PassthroughJobCoordinator since there is no coordination needed
+job.coordinator.factory=org.apache.samza.standalone.PassthroughJobCoordinatorFactory
+job.coordination.utils.factory=org.apache.samza.standalone.PassthroughCoordinationUtilsFactory
+
+job.changelog.system=kafka
+
+# Use a single container to process all of the data
+task.name.grouper.factory=org.apache.samza.container.grouper.task.SingleContainerGrouperFactory
+processor.id=0
+
+# Read from the beginning of the topic
+systems.kafka.default.stream.samza.offset.default=oldest
+{% endhighlight %}
+
+For more details on Samza's configs, feel free to check out the latest [configuration reference](/learn/documentation/{{site.version}}/jobs/configuration-table.html).
+
+### Run your application
+
+We are ready to add a `main()` function to the `WordCount` class. It parses the command-line arguments and instantiates a `LocalApplicationRunner` to execute the application locally.
+
+{% highlight java %}
+public static void main(String[] args) {
+ CommandLine cmdLine = new CommandLine();
+ OptionSet options = cmdLine.parser().parse(args);
+ Config config = cmdLine.loadConfig(options);
+ LocalApplicationRunner runner = new LocalApplicationRunner(new WordCount(), config);
+ runner.run();
+ runner.waitForFinish();
+}
+{% endhighlight %}
+
+
+Before running `main()`, we will create our input Kafka topic and populate it with sample data. You can download the scripts to interact with Kafka along with the sample data from [here](https://github.com/apache/samza-hello-samza/blob/latest/quickstart/wordcount.tar.gz).
+
+{% highlight bash %}
+> ./scripts/grid install zookeeper && ./scripts/grid start zookeeper
+> ./scripts/grid install kafka && ./scripts/grid start kafka
+{% endhighlight %}
+
+
+{% highlight bash %}
+> ./deploy/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic sample-text --partition 1 --replication-factor 1
+> ./deploy/kafka/bin/kafka-console-producer.sh --topic sample-text --broker localhost:9092 < ./sample-text.txt
+{% endhighlight %}
+
+Let’s kick off our application and use gradle to run it. Alternately, you can also run it directly from your IDE, with the same program arguments.
+
+{% highlight bash %}
+> export BASE_DIR=`pwd`
+> ./gradlew run --args="--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$BASE_DIR/src/main/config/word-count.properties"
+{% endhighlight %}
+
+
+The application will output to a Kafka topic named "word-count-output". We will now fire up a Kafka consumer to read from this topic:
+
+{% highlight bash %}
+>  ./deploy/kafka/bin/kafka-console-consumer.sh --topic word-count-output --zookeeper localhost:2181 --from-beginning
+{% endhighlight %}
+
+It will show the counts for each word like the following:
+
+{% highlight bash %}
+well: 4
+complaining: 1
+die: 3
+but: 22
+not: 50
+truly: 5
+murmuring: 1
+satisfied: 3
+the: 120
+thy: 8
+gods: 8
+thankful: 1
+and: 243
+from: 16
+{% endhighlight %}
+
+Congratulations! You've successfully run your first Samza application.
+
+### [More Examples >>](/startup/code-examples/{{site.version}})
\ No newline at end of file