SAMZA-1856: eventhub descriptors
authorDaniel Chen <dchen1@linkedin.com>
Thu, 11 Oct 2018 18:49:00 +0000 (11:49 -0700)
committerPrateek Maheshwari <pmaheshwari@apache.org>
Thu, 11 Oct 2018 18:49:00 +0000 (11:49 -0700)
prateekm lhaiesp
Add system descriptors and input/output descriptors for Eventhubs.
We should make an effort to deprecate the `systems.%s.stream.list`.

Author: Daniel Chen <dchen1@linkedin.com>

Reviewers: Prateek Maheshwari <pmaheshwari@apache.org>

Closes #696 from dxichen/eventhub-system-descriptors

15 files changed:
docs/learn/documentation/versioned/azure/eventhubs.md
docs/learn/documentation/versioned/connectors/eventhubs.md
docs/learn/documentation/versioned/jobs/configuration-table.html
docs/learn/documentation/versioned/jobs/samza-configurations.md
samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericSystemDescriptor.java
samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsInputDescriptor.java [new file with mode: 0644]
samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsOutputDescriptor.java [new file with mode: 0644]
samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsSystemDescriptor.java [new file with mode: 0644]
samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java
samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsInputDescriptor.java [new file with mode: 0644]
samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsOutputDescriptor.java [new file with mode: 0644]
samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsSystemDescriptor.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemDescriptor.java

index 53fd5eb..18e9ebd 100644 (file)
@@ -1,6 +1,6 @@
 ---
 layout: page
-title: Connecting to Eventhubs
+title: Connecting to Event Hubs
 ---
 <!--
    Licensed to the Apache Software Foundation (ASF) under one or more
@@ -19,13 +19,13 @@ title: Connecting to Eventhubs
    limitations under the License.
 -->
 
-You can configure your Samza jobs to process data from [Azure Eventhubs](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features), Microsoft's data streaming service. An `event hub` is similar to a Kafka topic and can have multiple partitions with producers and consumers. Each message produced or consumed from an event hub is an instance of [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data). 
+You can configure your Samza jobs to process data from [Azure Event Hubs](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features), Microsoft's data streaming service. An `event hub` is similar to a Kafka topic and can have multiple partitions with producers and consumers. Each message produced or consumed from an event hub is an instance of [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data). 
 
-### Consuming from EventHubs:
+### Consuming from Event Hubs:
 
 Samza's [EventHubSystemConsumer](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java) wraps the EventData into an [EventHubIncomingMessageEnvelope](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java). The key of the message is set to the partition key of the EventData. The message is obtained from the EventData body. 
 
-To configure Samza to configure from EventHub streams: 
+To configure Samza to configure from Event Hubs streams: 
 
 ```
 # define an event hub system factory with your identifier. eg: eh-system
@@ -46,21 +46,21 @@ systems.eh-system.streams.output0.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
 systems.eh-system.streams.output0.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
 ```
 
-The tuple required to access the Eventhubs entity per stream must be provided, namely the fields `YOUR-STREAM-NAMESPACE`, `YOUR-ENTITY-NAME`, `YOUR-SAS-KEY-NAME`, `YOUR-SAS-KEY-TOKEN`.
+The tuple required to access the Event Hubs entity per stream must be provided, namely the fields `YOUR-STREAM-NAMESPACE`, `YOUR-ENTITY-NAME`, `YOUR-SAS-KEY-NAME`, `YOUR-SAS-KEY-TOKEN`.
 
-### Producing to EventHubs:
+### Producing to Event Hubs:
 
-Similarly, you can also configure your Samza job to write to EventHubs.  
+Similarly, you can also configure your Samza job to write to Event Hubs.  
 ```
 OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new SystemStream("eh-system", "output0"), key, message);
 collector.send(envelope);
 ```
 
-Each [OutgoingMessageEnvelope](https://samza.apache.org/learn/documentation/latest/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) is converted into an [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data) instance whose body is set to the `message` in the envelope. Additionally, the `key` and the `produce timestamp` are set as properties in the EventData before sending it to EventHubs.
+Each [OutgoingMessageEnvelope](https://samza.apache.org/learn/documentation/latest/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) is converted into an [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data) instance whose body is set to the `message` in the envelope. Additionally, the `key` and the `produce timestamp` are set as properties in the EventData before sending it to Event Hubs.
 
 #### Size limit of partition key:
 
-Note that EventHubs has a limit on the length of partition key (128 characters). In [EventHubSystemProducer](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java) we truncate the partition key if the size of the key exceeds the limit.
+Note that Event Hubs has a limit on the length of partition key (128 characters). In [EventHubSystemProducer](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java) we truncate the partition key if the size of the key exceeds the limit.
 
 ### Advanced configuration:
 
@@ -82,7 +82,7 @@ systems.eh-system.partition.method = EVENT_HUB_HASHING
 
 ##### Consumer groups: 
 
-Eventhub supports a notion of [consumer groups](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups) which enable multiple applications have their own view of the event stream. Each event hub stream has a pre-defined consumer group named `$Default`. You can define your own consumer group for your job and configure a `eventhubs.consumer.group`  
+Event Hubs supports a notion of [consumer groups](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups) which enable multiple applications have their own view of the event stream. Each event hub stream has a pre-defined consumer group named `$Default`. You can define your own consumer group for your job and configure a `eventhubs.consumer.group`  
 
 ```
 systems.eh-system.streams.eh-input0.eventhubs.consumer.group = my-group
@@ -90,7 +90,7 @@ systems.eh-system.streams.eh-input0.eventhubs.consumer.group = my-group
 
 ##### Serde: 
 
-By default, the messages from EventHubs are sent and received as byte arrays. You can configure a serializer and deserializer for your message by setting a value for `msg.serde` for your stream. 
+By default, the messages from Event Hubs are sent and received as byte arrays. You can configure a serializer and deserializer for your message by setting a value for `msg.serde` for your stream. 
 
 ```
 streams.input0.samza.msg.serde = json
@@ -107,9 +107,9 @@ systems.eh-system.eventhubs.receive.queue.size = 10
 
 For the list of all configs, check out the configuration table page [here](../jobs/configuration-table.html)
 
-### Azure Eventhubs Hello-Samza Example
+### Azure Event Hubs Hello-Samza Example
 
-The [hello-samza](https://github.com/apache/samza-hello-samza) project contains an example of a high level job that consumes and produces to Eventhub using the Zookeeper deployment model.
+The [hello-samza](https://github.com/apache/samza-hello-samza) project contains an example of a high level job that consumes and produces to Event Hubs using the Zookeeper deployment model.
 
 #### Get the Code
 
@@ -141,13 +141,13 @@ If you get a complaint that JAVA_HOME is not set, then you'll need to set it to
 Here are the configs you must set before building the project. Configure these in the `src/main/config/azure-application-local-runner.properties` file.
 
 ```
-# Add your EventHubs input stream credentials here
+# Add your Event Hubs input stream credentials here
 systems.eventhubs.streams.input-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
 systems.eventhubs.streams.input-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
 systems.eventhubs.streams.input-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
 systems.eventhubs.streams.input-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
 
-# Add your EventHubs output stream credentials here
+# Add your Event Hubs output stream credentials here
 systems.eventhubs.streams.output-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE
 systems.eventhubs.streams.output-stream.eventhubs.entitypath=YOUR-ENTITY-NAME
 systems.eventhubs.streams.output-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
index 0f8766b..e805efb 100644 (file)
@@ -1,6 +1,6 @@
 ---
 layout: page
-title: Eventhubs Connector
+title: Event Hubs Connector
 ---
 <!--
    Licensed to the Apache Software Foundation (ASF) under one or more
@@ -21,13 +21,13 @@ title: Eventhubs Connector
 
 ## Overview
 
-The Samza EventHubs connector provides access to [Azure Eventhubs](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features), Microsoft’s data streaming service on Azure. An event hub is similar to a Kafka topic and can have multiple partitions with producers and consumers. Each message produced or consumed from an event hub is an instance of [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data).
+The Samza Event Hubs connector provides access to [Azure Event Hubs](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features), Microsoft’s data streaming service on Azure. An event hub is similar to a Kafka topic and can have multiple partitions with producers and consumers. Each message produced or consumed from an event hub is an instance of [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data).
 
-## Consuming from EventHubs
+## Consuming from Event Hubs
 
-Samza’s [EventHubSystemConsumer](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java) wraps the EventData into an [EventHubIncomingMessageEnvelope](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java). Samza's eventhubs consumer wraps each message from Eventhubs into an EventHubMessageEnvelope. The envelope has two fields of interest - the key, which is set to the event's partition key and the message, which is set to the actual data in the event.
+Samza’s [EventHubSystemConsumer](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java) wraps the EventData into an [EventHubIncomingMessageEnvelope](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java). Samza's Event Hubs consumer wraps each message from Event Hubs into an EventHubMessageEnvelope. The envelope has two fields of interest - the key, which is set to the event's partition key and the message, which is set to the actual data in the event.
 
-You can configure your Samza jobs to process data from Azure Eventhubs. To configure Samza to consume from EventHub streams:
+You can configure your Samza jobs to process data from Azure Event Hubs. To configure Samza to consume from Event Hubs streams:
 
 {% highlight jproperties %}
 # define an event hub system factory with your identifier. eg: eh-system
@@ -46,9 +46,9 @@ streams.eh-input-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
 
 It is required to provide values for YOUR-STREAM-NAMESPACE, YOUR-ENTITY-NAME, YOUR-SAS-KEY-NAME, YOUR-SAS-KEY-TOKEN to read or write to the stream.
 
-## Producing to EventHubs
+## Producing to Event Hubs
 
-Similarly, you can also configure your Samza job to write to EventHubs. Follow the same configs defined in the Consuming from EventHubs section to write to EventHubs:
+Similarly, you can also configure your Samza job to write to Event Hubs. Follow the same configs defined in the Consuming from Event Hubs section to write to Event Hubs:
 
 {% highlight jproperties %}
 # define an event hub system factory with your identifier. eg: eh-system
@@ -64,14 +64,14 @@ streams.eh-output-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME
 streams.eh-output-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN
 {% endhighlight %}
 
-Then you can create and produce a message to eventhubs in your code as below:
+Then you can create and produce a message to Event Hubs in your code as below:
 
 {% highlight java %}
 OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new SystemStream("eh-system", "output0"), key, message); 
 collector.send(envelope);
 {% endhighlight %}
 
-Each [OutgoingMessageEnvelope](https://samza.apache.org/learn/documentation/latest/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) is converted into an [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data) instance whose body is set to the message in the envelope. Additionally, the key and the produce timestamp are set as properties in the EventData before sending it to EventHubs.
+Each [OutgoingMessageEnvelope](https://samza.apache.org/learn/documentation/latest/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) is converted into an [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data) instance whose body is set to the message in the envelope. Additionally, the key and the produce timestamp are set as properties in the EventData before sending it to Event Hubs.
 
 ## Advanced configuration
 
@@ -91,7 +91,7 @@ systems.eh-system.partition.method = EVENT_HUB_HASHING
 
 ### Consumer groups
 
-Eventhub supports the notion of [consumer groups](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups) which enable multiple applications to have their own view of the event stream. Each partition is exclusively consumed by one consumer in the consumer group. Each event hub stream has a pre-defined consumer group named $Default. You can define your own consumer group for your job by configuring a eventhubs.consumer.group
+Event Hubs supports the notion of [consumer groups](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups) which enable multiple applications to have their own view of the event stream. Each partition is exclusively consumed by one consumer in the consumer group. Each event hub stream has a pre-defined consumer group named $Default. You can define your own consumer group for your job by configuring an Event Hubs.consumer.group
 
 {% highlight jproperties %}
 streams.eh-input-stream.eventhubs.consumer.group = my-group
@@ -99,7 +99,7 @@ streams.eh-input-stream.eventhubs.consumer.group = my-group
 
 ### Serde
 
-By default, the messages from EventHubs are sent and received as byte arrays. You can configure a serializer and deserializer for your message by setting a value for msg.serde for your stream.
+By default, the messages from Event Hubs are sent and received as byte arrays. You can configure a serializer and deserializer for your message by setting a value for msg.serde for your stream.
 
 {% highlight jproperties %}
 streams.input0.samza.msg.serde = json
index 44d43f3..c7e987c 100644 (file)
 
                 <tr>
                     <th colspan="3" class="section" id="eventhub">
-                        Using <a href="https://azure.microsoft.com/en-us/services/event-hubs/">EventHubs</a> for input and output streams<br>
+                        Using <a href="https://azure.microsoft.com/en-us/services/event-hubs/">Event Hubs</a> for input and output streams<br>
                         <span class="subtitle">
                             (This section applies if you have set
                             <a href="#systems-samza-factory" class="property">systems.*.samza.factory</a>
                 <tr>
                     <td class="property" id="eventhub-stream-list">systems.<span class="system">system-name</span>.<br>stream.list</td>
                     <td class="default"></td>
-                    <td class="description">List of Samza <span class="stream">stream-ids</span> used for the Eventhub system</td>
+                    <td class="description">List of Samza <span class="stream">stream-ids</span> used for the Event Hubs system. Required if not using input/output system descriptors.</td>
                 </tr>
 
                 <tr>
                     <td class="property" id="eventhub-stream-namespace">streams.<span class="stream">stream-id</span>.<br>eventhubs.namespace</td>
                     <td class="default"></td>
-                    <td class="description">Namespace of the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td>
+                    <td class="description">Namespace of the associated <span class="stream">stream-ids</span>. Required to access the Event Hubs entity per stream.</td>
                 </tr>
 
                 <tr>
                     <td class="property" id="eventhub-stream-entity">streams.<span class="stream">stream-id</span>.<br>eventhubs.entitypath</td>
                     <td class="default"></td>
-                    <td class="description">Entity of the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td>
+                    <td class="description">Entity of the associated <span class="stream">stream-ids</span>. Required to access the Event Hubs entity per stream.</td>
                 </tr>
 
                 <tr>
                     <td class="property" id="eventhub-stream-sas-keyname">sensitive.streams.<span class="stream">stream-id</span>.<br>eventhubs.sas.keyname</td>
                     <td class="default"></td>
-                    <td class="description">SAS Keyname of the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td>
+                    <td class="description">SAS Keyname of the associated <span class="stream">stream-ids</span>. Required to access the Event Hubs entity per stream.</td>
                 </tr>
 
                 <tr>
                     <td class="property" id="eventhub-stream-sas-token">sensitive.streams.<span class="stream">stream-id</span>.<br>eventhubs.sas.token</td>
                     <td class="default"></td>
-                    <td class="description">SAS Token the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td>
+                    <td class="description">SAS Token of the associated <span class="stream">stream-ids</span>. Required to access the Event Hubs entity per stream.</td>
                 </tr>
 
                 <tr>
                 <tr>
                     <td class="property" id="eventhub-prefetch-count">systems.<span class="system">system-name</span>.<br>eventhubs.prefetchCount</td>
                     <td class="default">999</td>
-                    <td class="description">Number of events that EventHub client should prefetch from the server. See <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._partition_receiver.setprefetchcount">here </a> for more details.</td>
+                    <td class="description">Number of events that Event Hubs client should prefetch from the server. See <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._partition_receiver.setprefetchcount">here </a> for more details.</td>
                 </tr>
 
                 <tr>
                     <td class="property" id="eventhub-max-event-count">systems.<span class="system">system-name</span>.<br>eventhubs.maxEventCountPerPoll</td>
                     <td class="default">50</td>
-                    <td class="description">Maximum number of events that EventHub client can return in a receive call. See <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._partition_receive_handler.getmaxeventcount#com_microsoft_azure_eventhubs__partition_receive_handler_getMaxEventCount__">here </a> for more details.</td>
+                    <td class="description">Maximum number of events that Event Hubs client can return in a receive call. See <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._partition_receive_handler.getmaxeventcount#com_microsoft_azure_eventhubs__partition_receive_handler_getMaxEventCount__">here </a> for more details.</td>
                 </tr>
 
                 <tr>
                     <td class="property" id="eventhub-runtime-timeout">systems.<span class="system">system-name</span>.<br>eventhubs.runtime.info.timeout</td>
                     <td class="default">60000</td>
-                    <td class="description">Timeout for fetching the runtime metadata from an Eventhub entity on startup in millis.</td>
+                    <td class="description">Timeout for fetching the runtime metadata from an Event Hubs entity on startup in millis.</td>
                 </tr>
 
                 <tr>
                     <td class="property" id="eventhub-send-partition-method">systems.<span class="system">system-name</span>.<br>eventhubs.partition.method</td>
                     <td class="default"><code>EVENT_HUB_HASHING</code></td>
                     <td class="description">
-                        Producer only config. Configure the method that the message is partitioned for the downstream Eventhub in one of the following ways:
+                        Producer only config. Configure the method that the message is partitioned for the downstream Event Hubs entity in one of the following ways:
                         <dl>
                             <dt><code>ROUND_ROBIN</code></dt>
-                            <dd>The message key and partition key are ignored and the message will be distributed in a round-robin fashion amongst all the partitions in the downstream EventHub.</dd>
+                            <dd>The message key and partition key are ignored and the message will be distributed in a round-robin fashion amongst all the partitions in the downstream Event Hubs entity.</dd>
                             <dt><code>EVENT_HUB_HASHING</code></dt>
-                            <dd>Employs the hashing mechanism in EventHubs to determine, based on the key of the message, which partition the message should go. Using this method still ensures that all the events with the same key are sent to the same partition in the event hub. If this option is chosen, the partition key used for the hash should be a string. If the partition key is not set, the message key is used instead.</dd>
+                            <dd>Employs the hashing mechanism in Event Hubs to determine, based on the key of the message, which partition the message should go. Using this method still ensures that all the events with the same key are sent to the same partition in the event hub. If this option is chosen, the partition key used for the hash should be a string. If the partition key is not set, the message key is used instead.</dd>
                             <dt><code>PARTITION_KEY_AS_PARTITION</code></dt>
-                            <dd>Use the integer key specified by the partition key or key of the message to a specific partition on Eventhub. If the integer key is greater than the number of partitions in the destination Eventhub, a modulo operation will be performed to determine the resulting paritition. ie. if there are 6 partitions and the key is 9, the message will end up in partition 3. Similarly to EVENT_HUB_HASHING, if the partition key is not set the message key is used instead.</dd>
+                            <dd>Use the integer key specified by the partition key or key of the message to a specific partition on Event Hubs. If the integer key is greater than the number of partitions in the destination Event Hubs entity, a modulo operation will be performed to determine the resulting partition. ie. if there are 6 partitions and the key is 9, the message will end up in partition 3. Similarly to EVENT_HUB_HASHING, if the partition key is not set the message key is used instead.</dd>
                         </dl>
                     </td>
                 </tr>
                     <td class="property" id="eventhub-send-key">systems.<span class="system">system-name</span>.<br>eventhubs.send.key</td>
                     <td class="default">true</td>
                     <td class="description">
-                        Producer only config. Sending each message key to the eventhub in the properties of the AMQP message. If the Samza Eventhub consumer is used, this field is used as the message key if the partition key is not present.
+                        Producer only config. If set to true, the key of the Samza message will be included as the 'key' property in the outgoing EventData message for Event Hubs. The Samza message key will not be sent otherwise. Note: If the Samza Event Hubs consumer is used, the Samza key is the partition key of the received EventData, or the message key if the partition key is not present.
                     </td>
                 </tr>
 
                     <td class="property" id="eventhub-consumer-group">streams.<span class="stream">stream-id</span>.<br>eventhubs.consumer.group</td>
                     <td class="default"><code>$Default</code></td>
                     <td class="description">
-                        Consumer only config. Set the consumer group from the upstream Eventhub that the consumer is part of. Defaults to the <code>$Default</code> group that is initially present in all Eventhub entities (unless removed)
+                        Consumer only config. Set the consumer group from the upstream Event Hubs that the consumer is part of. Defaults to the <code>$Default</code> group that is initially present in all Event Hubs entities (unless removed)
                     </td>
                 </tr>
 
                     <td class="property" id="eventhub-consumer-buffer-capacity">systems.<span class="system">system-name</span>.<br>eventhubs.receive.queue.size</td>
                     <td class="default">100</td>
                     <td class="description">
-                        Consumer only config. Per partition capacity of the eventhubs consumer buffer - the blocking queue used for storing messages. Larger buffer capacity typically leads to better throughput but consumes more memory.
+                        Consumer only config. Per partition capacity of the Event Hubs consumer buffer - the blocking queue used for storing messages. Larger buffer capacity typically leads to better throughput but consumes more memory.
                     </td>
                 </tr>
 
index 2a38134..ea76210 100644 (file)
@@ -29,7 +29,7 @@ The following table lists the complete set of properties that can be included in
   + [3.1 Advanced System & Stream Configuration](#advanced-system-stream-configurations)
   + [3.2 Kafka](#kafka)
   + [3.3 HDFS](#hdfs)
-  + [3.4 EventHubs](#eventhubs)
+  + [3.4 Event Hubs](#eventhubs)
   + [3.5 Kinesis](#kinesis)
   + [3.6 ElasticSearch](#elasticsearch)
 * [4. State Storage](#state-storage)
@@ -114,7 +114,7 @@ These are the basic properties for setting up a Samza application.
 |task.checkpoint.<br>segment.bytes|26214400|If you are using Kafka for checkpoints, this is the segment size to be used for the checkpoint topic's log segments. Keeping this number small is useful because it increases the frequency that Kafka will garbage collect old checkpoints.|
 
 ### <a name="systems-streams"></a>[3. Systems & Streams](#systems-streams)
-Samza consumes from and produces to [Streams](../container/streams.html) and has support for a variety of Systems including Kafka, HDFS, Azure EventHubs, Kinesis and ElasticSearch.
+Samza consumes from and produces to [Streams](../container/streams.html) and has support for a variety of Systems including Kafka, HDFS, Azure Event Hubs, Kinesis and ElasticSearch.
 
 |Name|Default|Description|
 |--- |--- |--- |
@@ -184,29 +184,29 @@ More about batch processing [here](../hadoop/overview.html).
 |systems.**_system-name_**.<br>.producer.hdfs.write.batch.size.bytes|268435456|The number of bytes of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 256MB if not set.|
 |systems.**_system-name_**.<br>.producer.hdfs.write.batch.size.records|262144|The number of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 262144 if not set.|
 
-#### <a name="eventhubs"></a>[3.4 EventHubs](#eventhubs)
-Configs for consuming and producing to [Azure EventHubs](https://azure.microsoft.com/en-us/services/event-hubs/). This section applies if you have set systems.*.samza.factory = `org.apache.samza.system.eventhub.EventHubSystemFactory`
+#### <a name="eventhubs"></a>[3.4 Event Hubs](#eventhubs)
+Configs for consuming and producing to [Azure Event Hubs](https://azure.microsoft.com/en-us/services/event-hubs/). This section applies if you have set systems.*.samza.factory = `org.apache.samza.system.eventhub.EventHubSystemFactory`
 Documentation and samples found [here](../azure/eventhubs.html)
 
 |Name|Default|Description|
 |--- |--- |--- |
-|systems.**_system-name_**.stream.list| |List of Samza **_stream-id_** used for the Eventhub system|
-|streams.**_stream-id_**.eventhubs.namespace| |Namespace of the associated stream-ids. __Required__ to access the Eventhubs entity per stream.|
-|streams.**_stream-id_**.eventhubs.entitypath| |Entity of the associated stream-ids. __Required__ to access the Eventhubs entity per stream.|
-|sensitive.streams.**_stream-id_**.eventhubs.sas.keyname| |SAS Keyname of the associated stream-ids. __Required__ to access the Eventhubs entity per stream.|
-|sensitive.streams.**_stream-id_**.eventhubs.sas.token| |SAS Token the associated stream-ids. __Required__ to access the Eventhubs entity per stream.|
+|systems.**_system-name_**.stream.list| |List of Samza **_stream-id_** used for the Event Hubs system. __Required__ if not using input/output system descriptors.|
+|streams.**_stream-id_**.eventhubs.namespace| |Namespace of the associated stream-ids. __Required__ to access the Event Hubs entity per stream.|
+|streams.**_stream-id_**.eventhubs.entitypath| |Entity of the associated stream-ids. __Required__ to access the Event Hubs entity per stream.|
+|sensitive.streams.**_stream-id_**.eventhubs.sas.keyname| |SAS Keyname of the associated stream-ids. __Required__ to access the Event Hubs entity per stream.|
+|sensitive.streams.**_stream-id_**.eventhubs.sas.token| |SAS Token of the associated stream-ids. __Required__ to access the Event Hubs entity per stream.|
 
-##### <a name="advanced-eventhubs-configurations"></a>[Advanced EventHubs Configurations](#advanced-eventhubs-configurations)
+##### <a name="advanced-eventhubs-configurations"></a>[Advanced Event Hubs Configurations](#advanced-eventhubs-configurations)
 |Name|Default|Description|
 |--- |--- |--- |
 |streams.**_stream-name_**.<br>eventhubs.numClientThreads|10|Number of threads in thread pool that will be used by the EventHubClient. See [here](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_hub_client.create?view=azure-java-stable) for more details.|
 |systems.**_system-name_**.<br>eventhubs.prefetchCount|999|Number of threads in thread pool that will be used by the EventHubClient. See [here](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_hub_client.create?view=azure-java-stable) for more details.|
-|systems.**_system-name_**.<br>eventhubs.maxEventCountPerPoll|50|Maximum number of events that EventHub client can return in a receive call. See Maximum number of events that EventHub client can return in a receive call. See [here](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._partition_receive_handler.getmaxeventcount?view=azure-java-stable#com_microsoft_azure_eventhubs__partition_receive_handler_getMaxEventCount__) for more details. for more details.|
-|systems.**_system-name_**.<br>eventhubs.runtime.info.timeout|60000|Timeout for fetching the runtime metadata from an Eventhub entity on startup in millis.|
-|systems.**_system-name_**.<br>eventhubs.partition.method|`EVENT_HUB_HASHING`|Producer only config. Configure the method that the message is partitioned for the downstream Eventhub in one of the following ways:<br><br>`ROUND_ROBIN` <br>The message key and partition key are ignored and the message will be distributed in a round-robin fashion amongst all the partitions in the downstream EventHub.<br><br>`EVENT_HUB_HASHING` <br>Employs the hashing mechanism in EventHubs to determine, based on the key of the message, which partition the message should go. Using this method still ensures that all the events with the same key are sent to the same partition in the event hub. If this option is chosen, the partition key used for the hash should be a string. If the partition key is not set, the message key is used instead.<br><br>`PARTITION_KEY_AS_PARTITION` <br>Use the integer key specified by the partition key or key of the message to a specific partition on Eventhub. If the integer key is greater than the number of partitions in the destination Eventhub, a modulo operation will be performed to determine the resulting paritition. ie. if there are 6 partitions and the key is 9, the message will end up in partition 3. Similarly to `EVENT_HUB_HASHING`, if the partition key is not set the message key is used instead.|
-|systems.**_system-name_**.<br>eventhubs.send.key|true|Producer only config. Sending each message key to the eventhub in the properties of the AMQP message. If the Samza Eventhub consumer is used, this field is used as the message key if the partition key is not present.|
-|streams.**_stream-id_**.<br>eventhubs.consumer.group|`$Default`|Consumer only config. Set the consumer group from the upstream Eventhub that the consumer is part of. Defaults to the `$Default` group that is initially present in all Eventhub entities (unless removed)|
-|systems.**_system-name_**.<br>eventhubs.receive.queue.size|100|Consumer only config. Per partition capacity of the eventhubs consumer buffer - the blocking queue used for storing messages. Larger buffer capacity typically leads to better throughput but consumes more memory.|
+|systems.**_system-name_**.<br>eventhubs.maxEventCountPerPoll|50|Maximum number of events that the Event Hubs client can return in a receive call. See Maximum number of events that the Event Hubs client can return in a receive call. See [here](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._partition_receive_handler.getmaxeventcount?view=azure-java-stable#com_microsoft_azure_eventhubs__partition_receive_handler_getMaxEventCount__) for more details. for more details.|
+|systems.**_system-name_**.<br>eventhubs.runtime.info.timeout|60000|Timeout for fetching the runtime metadata from an Event Hubs entity on startup in millis.|
+|systems.**_system-name_**.<br>eventhubs.partition.method|`EVENT_HUB_HASHING`|Producer only config. Configure the method that the message is partitioned for the downstream Eventhub in one of the following ways:<br><br>`ROUND_ROBIN` <br>The message key and partition key are ignored and the message will be distributed in a round-robin fashion amongst all the partitions in the downstream EventHub.<br><br>`EVENT_HUB_HASHING` <br>Employs the hashing mechanism in Event Hubs to determine, based on the key of the message, which partition the message should go. Using this method still ensures that all the events with the same key are sent to the same partition in the event hub. If this option is chosen, the partition key used for the hash should be a string. If the partition key is not set, the message key is used instead.<br><br>`PARTITION_KEY_AS_PARTITION` <br>Use the integer key specified by the partition key or key of the message to a specific partition on Event Hubs. If the integer key is greater than the number of partitions in the destination Event Hubs entity, a modulo operation will be performed to determine the resulting partition. ie. if there are 6 partitions and the key is 9, the message will end up in partition 3. Similarly to `EVENT_HUB_HASHING`, if the partition key is not set the message key is used instead.|
+|systems.**_system-name_**.<br>eventhubs.send.key|true|Producer only config. If set to true, the key of the Samza message will be included as the 'key' property in the outgoing EventData message for Event Hubs. The Samza message key will not be sent otherwise. <br> Note: If the Samza Event Hubs consumer is used, the Samza key is the partition key of the received EventData, or the message key if the partition key is not present.|
+|streams.**_stream-id_**.<br>eventhubs.consumer.group|`$Default`|Consumer only config. Set the consumer group from the upstream Event Hubs entity that the consumer is part of. Defaults to the `$Default` group that is initially present in all Event Hubs entities (unless removed)|
+|systems.**_system-name_**.<br>eventhubs.receive.queue.size|100|Consumer only config. Per partition capacity of the Event Hubs consumer buffer - the blocking queue used for storing messages. Larger buffer capacity typically leads to better throughput but consumes more memory.|
 
 
 #### <a name="kinesis"></a>[3.5 Kinesis](#kinesis)
index 362745f..24f7932 100644 (file)
@@ -47,18 +47,12 @@ public final class GenericSystemDescriptor extends SystemDescriptor<GenericSyste
     super(systemName, factoryClassName, null, null);
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public <StreamMessageType> GenericInputDescriptor<StreamMessageType> getInputDescriptor(
       String streamId, Serde<StreamMessageType> serde) {
     return new GenericInputDescriptor<>(streamId, this, serde);
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public <StreamMessageType> GenericOutputDescriptor<StreamMessageType> getOutputDescriptor(
       String streamId, Serde<StreamMessageType> serde) {
index 4e1e3bb..d6cdab5 100644 (file)
@@ -79,8 +79,8 @@ public class EventHubConfig extends MapConfig {
   private static final int MESSAGE_HEADER_OVERHEAD = 24 * 1024;
   private static final int DEFAULT_MAX_MESSAGE_SIZE = 1024 * 1024 - MESSAGE_HEADER_OVERHEAD;
 
-  // Each EventHub client maintains single TCP connection. To improve throughput, we will instantiate one
-  // client for each partition. Allow the option to disable the feature in case too many EventHub clients
+  // Each Event Hubs client maintains single TCP connection. To improve throughput, we will instantiate one
+  // client for each partition. Allow the option to disable the feature in case too many Event Hubs clients
   // end up causing unpredictable issues when number of partitions is really high.
   public static final String CONFIG_PER_PARTITION_CONNECTION = "systems.%s.eventhubs.perPartition.connection";
   public static final Boolean DEFAULT_CONFIG_PER_PARTITION_CONNECTION = true;
@@ -151,7 +151,7 @@ public class EventHubConfig extends MapConfig {
 
   /**
    * Get the list of streams that are defined. Each stream has enough
-   * information for connecting to a certain EventHub entity.
+   * information for connecting to a certain Event Hubs entity.
    *
    * @param systemName name of the system
    * @return list of stream names
@@ -161,11 +161,11 @@ public class EventHubConfig extends MapConfig {
   }
 
   /**
-   * Get the EventHubs namespace for the stream
+   * Get the Event Hubs namespace for the stream
    *
    * @param systemName name of the system
    * @param streamName name of stream (physical or streamId)
-   * @return EventHubs namespace
+   * @return Event Hubs namespace
    */
   public String getStreamNamespace(String systemName, String streamName) {
     LOG.info("Obtaining name-space for system: {} physical name: {}", systemName, streamName);
@@ -174,11 +174,11 @@ public class EventHubConfig extends MapConfig {
   }
 
   /**
-   * Get the EventHubs entity path (topic name) for the stream
+   * Get the Event Hubs entity path (topic name) for the stream
    *
    * @param systemName name of the system
    * @param streamName name of stream (physical or streamId)
-   * @return EventHubs entity path
+   * @return Event Hubs entity path
    */
   public String getStreamEntityPath(String systemName, String streamName) {
     LOG.info("Obtaining entity-path for system: {} physical name: {}", systemName, streamName);
@@ -206,16 +206,16 @@ public class EventHubConfig extends MapConfig {
   }
 
   /**
-   * Get the per partition prefetch count for the event hub client
+   * Get the per partition prefetch count for the Event Hubs client
    * @param systemName Name of the system.
-   * @return Per partition Prefetch count for the event hub client.
+   * @return Per partition Prefetch count for the Event Hubs client.
    */
   public Integer getPrefetchCount(String systemName) {
     return getInt(String.format(CONFIG_PREFETCH_COUNT, systemName), DEFAULT_CONFIG_PREFETCH_COUNT);
   }
 
   /**
-   * Get the EventHubs max Message size
+   * Get the Event Hubs max Message size
    *
    * @param systemName name of the system
    * @return the max message size supported in event hubs.
@@ -225,11 +225,11 @@ public class EventHubConfig extends MapConfig {
   }
 
   /**
-   * Get the EventHubs SAS (Shared Access Signature) key name for the stream
+   * Get the Event Hubs SAS (Shared Access Signature) key name for the stream
    *
    * @param systemName name of the system
    * @param streamName name of stream (physical or streamId)
-   * @return EventHubs SAS key name
+   * @return Event Hubs SAS key name
    */
   public String getStreamSasKeyName(String systemName, String streamName) {
     return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_KEY_NAME, streamName),
@@ -237,11 +237,11 @@ public class EventHubConfig extends MapConfig {
   }
 
   /**
-   * Get the EventHubs SAS (Shared Access Signature) token for the stream
+   * Get the Event Hubs SAS (Shared Access Signature) token for the stream
    *
    * @param systemName name of the system
    * @param streamName name of stream (physical or streamId)
-   * @return EventHubs SAS token
+   * @return Event Hubs SAS token
    */
   public String getStreamSasToken(String systemName, String streamName) {
     return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_TOKEN, streamName),
@@ -249,18 +249,18 @@ public class EventHubConfig extends MapConfig {
   }
 
   /**
-   * Get the EventHubs consumer group used for consumption for the stream
+   * Get the Event Hubs consumer group used for consumption for the stream
    *
    * @param systemName name of the system
    * @param streamName name of stream (physical or streamId)
-   * @return EventHubs consumer group
+   * @return Event Hubs consumer group
    */
   public String getStreamConsumerGroup(String systemName, String streamName) {
     return getFromStreamIdOrName(CONFIG_STREAM_CONSUMER_GROUP, streamName, DEFAULT_CONFIG_STREAM_CONSUMER_GROUP);
   }
 
   /**
-   * Get the partition method of the systemName. By default partitioning is handed by EventHub.
+   * Get the partition method of the systemName. By default partitioning is handled by Event Hubs.
    *
    * @param systemName name of the system
    * @return The method the producer should use to partition the outgoing data
@@ -287,7 +287,7 @@ public class EventHubConfig extends MapConfig {
   }
 
   /**
-   * Get the timeout for the getRuntimeInfo request to EventHub client
+   * Get the timeout for the getRuntimeInfo request to Event Hubs client
    *
    * @param systemName name of the systems
    * @return long, timeout in millis for fetching RuntimeInfo
@@ -298,7 +298,7 @@ public class EventHubConfig extends MapConfig {
   }
 
   /**
-   * Get the capacity of the Event Hub consumer buffer - the blocking queue used for storing messages
+   * Get the capacity of the Event Hubs consumer buffer - the blocking queue used for storing messages
    *
    * @param systemName name of the system
    * @return int, number of buffered messages per SystemStreamPartition
@@ -312,9 +312,12 @@ public class EventHubConfig extends MapConfig {
   }
 
   /**
-   * Returns whether to create one EventHub client per partition. Each EventHub client maintains
-   * single TCP connection. More EventHub clients will improve throughput in general.
+   * Returns whether to create one Event Hubs client per partition. Each Event Hubs client maintains
+   * single TCP connection. More Event Hubs clients will improve throughput in general.
    * For producer this config is only relevant when partition method is PARTITION_KEY_AS_PARTITION
+   *
+   * @param systemName name of the system
+   * @return true if an Event Hubs client should be created per partition, false otherwise
    */
   public Boolean getPerPartitionConnection(String systemName) {
     String isPerPartitionConnection = get(String.format(CONFIG_PER_PARTITION_CONNECTION, systemName));
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsInputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsInputDescriptor.java
new file mode 100644 (file)
index 0000000..d151d16
--- /dev/null
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.eventhub;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+
+
+/**
+ * A descriptor for the Event Hubs output stream
+ *<p>
+ *   An instance of this descriptor may be obtained from an {@link EventHubsSystemDescriptor}
+ *</p>
+ * Stream properties configured using a descriptor overrides corresponding properties and property defaults provided
+ * in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream
+ */
+public class EventHubsInputDescriptor<StreamMessageType>
+    extends InputDescriptor<StreamMessageType, EventHubsInputDescriptor<StreamMessageType>> {
+  private String namespace;
+  private String entityPath;
+  private Optional<String> sasKeyName = Optional.empty();
+  private Optional<String> sasToken = Optional.empty();
+  private Optional<String> consumerGroup = Optional.empty();
+
+  /**
+   * Constructs an {@link InputDescriptor} instance.
+   *
+   * @param streamId id of the stream
+   * @param namespace namespace for the Event Hubs entity to consume from, not null
+   * @param entityPath entity path for the Event Hubs entity to consume from, not null
+   * @param serde serde for messages in the stream
+   * @param systemDescriptor system descriptor this stream descriptor was obtained from
+   */
+  EventHubsInputDescriptor(String streamId, String namespace, String entityPath, Serde serde,
+      SystemDescriptor systemDescriptor) {
+    super(streamId, serde, systemDescriptor, null);
+    this.namespace = StringUtils.stripToNull(namespace);
+    this.entityPath = StringUtils.stripToNull(entityPath);
+    if (this.namespace == null || this.entityPath == null) {
+      throw new ConfigException(String.format("Missing namespace and entity path Event Hubs input descriptor in " //
+          + "system: {%s}, stream: {%s}", getSystemName(), streamId));
+    }
+  }
+
+  /**
+   * SAS Key name of the associated input stream. Required to access the input Event Hubs entity per stream.
+   *
+   * @param sasKeyName the name of the SAS key required to access the Event Hubs entity
+   * @return this input descriptor
+   */
+  public EventHubsInputDescriptor<StreamMessageType> withSasKeyName(String sasKeyName) {
+    this.sasKeyName = Optional.of(StringUtils.stripToNull(sasKeyName));
+    return this;
+  }
+
+  /**
+   * SAS Token of the associated input stream. Required to access the input Event Hubs per stream.
+   *
+   * @param sasToken the SAS token required to access the Event Hubs entity
+   * @return this input descriptor
+   */
+  public EventHubsInputDescriptor<StreamMessageType> withSasKey(String sasToken) {
+    this.sasToken = Optional.of(StringUtils.stripToNull(sasToken));
+    return this;
+  }
+
+  /**
+   * Set the consumer group from the upstream Event Hubs entity that the consumer is part of. Defaults to the
+   * <code>$Default</code> group that is initially present in all Event Hubs entities (unless removed)
+   *
+   * @param consumerGroup the name of the consumer group upstream
+   * @return this input descriptor
+   */
+  public EventHubsInputDescriptor<StreamMessageType> withConsumerGroup(String consumerGroup) {
+    this.consumerGroup = Optional.of(StringUtils.stripToNull(consumerGroup));
+    return this;
+  }
+
+  @Override
+  public Map<String, String> toConfig() {
+    HashMap<String, String> ehConfigs = new HashMap<>(super.toConfig());
+
+    String streamId = getStreamId();
+
+    ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId), namespace);
+    ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId), entityPath);
+
+    sasKeyName.ifPresent(keyName ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId), keyName));
+    sasToken.ifPresent(key ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId), key));
+    this.consumerGroup.ifPresent(consumerGroupName ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId), consumerGroupName));
+    return ehConfigs;
+  }
+
+}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsOutputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsOutputDescriptor.java
new file mode 100644 (file)
index 0000000..e59b4d0
--- /dev/null
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.eventhub;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * A descriptor for an Event Hubs output stream
+ * <p>
+ *   An instance of this descriptor may be obtained from and {@link EventHubsSystemDescriptor}
+ * </p>
+ * Stream properties configured using a descriptor overrides corresponding properties and property defaults provided
+ * in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream
+ */
+public class EventHubsOutputDescriptor<StreamMessageType>
+    extends OutputDescriptor<StreamMessageType, EventHubsOutputDescriptor<StreamMessageType>> {
+  private String namespace;
+  private String entityPath;
+  private Optional<String> sasKeyName = Optional.empty();
+  private Optional<String> sasToken = Optional.empty();
+
+  /**
+   * Constructs an {@link OutputDescriptor} instance.
+   *
+   * @param streamId id of the stream
+   * @param namespace namespace for the Event Hubs entity to produce to, not null
+   * @param entityPath entity path for the Event Hubs entity to produce to, not null
+   * @param serde serde for messages in the stream
+   * @param systemDescriptor system descriptor this stream descriptor was obtained from
+   */
+  EventHubsOutputDescriptor(String streamId, String namespace, String entityPath, Serde serde,
+      SystemDescriptor systemDescriptor) {
+    super(streamId, serde, systemDescriptor);
+    this.namespace = StringUtils.stripToNull(namespace);
+    this.entityPath = StringUtils.stripToNull(entityPath);
+    if (this.namespace == null || this.entityPath == null) {
+      throw new ConfigException(String.format("Missing namespace and entity path Event Hubs output descriptor in " //
+          + "system: {%s}, stream: {%s}", getSystemName(), streamId));
+    }
+  }
+
+  /**
+   * SAS Key name of the associated output stream. Required to access the output Event Hubs entity per stream.
+   *
+   * @param sasKeyName the name of the SAS key required to access the Event Hubs entity
+   * @return this output descriptor
+   */
+  public EventHubsOutputDescriptor<StreamMessageType> withSasKeyName(String sasKeyName) {
+    this.sasKeyName = Optional.of(StringUtils.stripToNull(sasKeyName));
+    return this;
+  }
+
+  /**
+   * SAS Token of the associated output stream. Required to access the output Event Hubs per stream.
+   *
+   * @param sasToken the SAS token required to access to Event Hubs entity
+   * @return this output descriptor
+   */
+  public EventHubsOutputDescriptor<StreamMessageType> withSasKey(String sasToken) {
+    this.sasToken = Optional.of(StringUtils.stripToNull(sasToken));
+    return this;
+  }
+
+  @Override
+  public Map<String, String> toConfig() {
+    HashMap<String, String> ehConfigs = new HashMap<>(super.toConfig());
+
+    String streamId = getStreamId();
+
+    ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId), namespace);
+    ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId), entityPath);
+    sasKeyName.ifPresent(keyName ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId), keyName));
+    sasToken.ifPresent(key ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId), key));
+    return ehConfigs;
+  }
+
+}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsSystemDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsSystemDescriptor.java
new file mode 100644 (file)
index 0000000..189340f
--- /dev/null
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.eventhub;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.PartitioningMethod;
+
+
+/**
+ * A descriptor for a Event Hubs system.
+ * <p>
+ * System properties configured using a descriptor override corresponding properties provided in configuration.
+ */
+public class EventHubsSystemDescriptor extends SystemDescriptor<EventHubsSystemDescriptor> {
+  private static final String FACTORY_CLASS_NAME = EventHubSystemFactory.class.getName();
+
+  private List<String> streamIds = new ArrayList<>();
+  private Optional<Integer> fetchRuntimeInfoTimeout = Optional.empty();
+  private Optional<Integer> numClientThreads = Optional.empty();
+  private Optional<Integer> consumerReceiveQueueSize = Optional.empty();
+  private Optional<Integer> consumerMaxEventCountPerPoll = Optional.empty();
+  private Optional<Integer> consumerPrefetchCount = Optional.empty();
+  private Optional<Boolean> producerEventhubsSendKey = Optional.empty();
+  private Optional<PartitioningMethod> producerEventhubsPartitioningMethod = Optional.empty();
+
+  /**
+   * Constructs a {@link SystemDescriptor} instance.
+   *  @param systemName name of this system
+   */
+  public EventHubsSystemDescriptor(String systemName) {
+    super(systemName, FACTORY_CLASS_NAME, null, null);
+  }
+
+  /**
+   * Gets an {@link EventHubsInputDescriptor} for the input stream of this system. The stream has the provided
+   * namespace and entity name of the associated Event Hubs entity and the provided stream level serde.
+   * <p>
+   * The type of messages in the stream is the type of the provided stream level serde.
+   *
+   * @param streamId id of the input stream
+   * @param namespace namespace of the Event Hubs entity to consume from
+   * @param entityPath entity path of the Event Hubs entity to consume from
+   * @param serde stream level serde for the input stream
+   * @param <StreamMessageType> type of messages in this stream
+   * @return an {@link EventHubsInputDescriptor} for the Event Hubs input stream
+   */
+  public <StreamMessageType> EventHubsInputDescriptor<StreamMessageType> getInputDescriptor(String streamId, String namespace,
+      String entityPath, Serde<StreamMessageType> serde) {
+    streamIds.add(streamId);
+    return new EventHubsInputDescriptor<>(streamId, namespace, entityPath, serde, this);
+  }
+
+  /**
+   * Gets an {@link EventHubsOutputDescriptor} for the output stream of this system. The stream has the provided
+   * namespace and entity name of the associated Event Hubs entity and the provided stream level serde.
+   * <p>
+   * The type of messages in the stream is the type of the provided stream level serde.
+   *
+   * @param streamId id of the output stream
+   * @param namespace namespace of the Event Hubs entity to produce to
+   * @param entityPath entity path of the Event Hubs entity to produce to
+   * @param serde stream level serde for the output stream
+   * @param <StreamMessageType> type of the messages in this stream
+   * @return an {@link EventHubsOutputDescriptor} for the Event Hubs output stream
+   */
+  public <StreamMessageType> EventHubsOutputDescriptor<StreamMessageType> getOutputDescriptor(String streamId, String namespace,
+      String entityPath, Serde<StreamMessageType> serde) {
+    streamIds.add(streamId);
+    return new EventHubsOutputDescriptor<>(streamId, namespace, entityPath, serde, this);
+  }
+
+  /**
+   * Timeout for fetching the runtime metadata from an Event Hubs entity on startup in millis.
+   *
+   * @param timeoutMS the timeout in ms for getting runtime information from the Event Hubs system
+   * @return this system descriptor
+   */
+  public EventHubsSystemDescriptor withRuntimeInfoTimeout(int timeoutMS) {
+    this.fetchRuntimeInfoTimeout = Optional.of(timeoutMS);
+    return this;
+  }
+
+  /**
+   * Number of threads in thread pool that will be used by the EventHubClient.
+   *
+   * @param numClientThreads the number of threads
+   * @return this system descriptor
+   */
+  public EventHubsSystemDescriptor withNumClientThreads(int numClientThreads) {
+    this.numClientThreads = Optional.of(numClientThreads);
+    return this;
+  }
+
+  /**
+   *  Per partition capacity of the Event Hubs consumer buffer - the blocking queue used for storing messages.
+   *  Larger buffer capacity typically leads to better throughput but consumes more memory.
+   *
+   * @param receiveQueueSize the number of messages from Event Hubs that should be buffered in the
+   *                      {@link org.apache.samza.util.BlockingEnvelopeMap}
+   * @return this system descriptor
+   */
+  public EventHubsSystemDescriptor withReceiveQueueSize(int receiveQueueSize) {
+    this.consumerReceiveQueueSize = Optional.of(receiveQueueSize);
+    return this;
+  }
+
+  /**
+   * Maximum number of events that Event Hubs client can return in a receive call.
+   *
+   * @param count the number of max events per poll
+   * @return this system descriptor
+   */
+  public EventHubsSystemDescriptor withMaxEventCountPerPoll(int count) {
+    this.consumerMaxEventCountPerPoll = Optional.of(count);
+    return this;
+  }
+
+  /**
+   * Number of events that Event Hubs client should prefetch from the server.
+   *
+   * @param count the number of events that should be prefetched.
+   * @return this system descriptor
+   */
+  public EventHubsSystemDescriptor withPrefetchCount(int count) {
+    this.consumerPrefetchCount = Optional.of(count);
+    return this;
+  }
+
+
+  /**
+   * Configure the method that the message is partitioned for the downstream Event Hubs in one of the following ways:
+   * <ul>
+   *   <li>ROUND_ROBIN:
+   *   The message key and partition key are ignored and the message
+   *   will be distributed in a round-robin fashion amongst all the partitions in the downstream Event Hubs entity.</li>
+   *   <li>EVENT_HUB_HASHING:
+   *   Employs the hashing mechanism in Event Hubs to determine, based on the key of the message,
+   *   which partition the message should go. Using this method still ensures that all the events with
+   *   the same key are sent to the same partition in the event hub. If this option is chosen, the partition
+   *   key used for the hash should be a string. If the partition key is not set, the message key is
+   *   used instead.</li>
+   *   <li>PARTITION_KEY_AS_PARTITION:
+   *   Use the integer key specified by the partition key or key of the message to a specific partition
+   *   on Event Hubs. If the integer key is greater than the number of partitions in the destination Event Hubs entity,
+   *   a modulo operation will be performed to determine the resulting paritition.
+   *   ie. if there are 6 partitions and the key is 9, the message will end up in partition 3.
+   *   Similarly to EVENT_HUB_HASHING, if the partition key is not set the message key is used instead.</li>
+   * </ul>
+   * @param partitioningMethod the desired partitioning method for the message in the downstream Event Hubs entity
+   * @return this system descriptor
+   */
+  public EventHubsSystemDescriptor withPartitioningMethod(PartitioningMethod partitioningMethod) {
+    this.producerEventhubsPartitioningMethod = Optional.ofNullable(partitioningMethod);
+    return this;
+  }
+
+  /**
+   *  If set to true, the key of the Samza message will be included as the 'key' property in the outgoing EventData
+   *  message for Event Hubs. The Samza message key will not be sent otherwise.
+   *  Note: If the Samza Event Hubs consumer is used, this field is the partition key of the received EventData, or the
+   *  message key if the partition key is not present.
+   *
+   * @param sendKeys set to true if the message key should be sent in the EventData properties, the key is not sent otherwise
+   * @return this system descriptor
+   */
+  public EventHubsSystemDescriptor withSendKeys(boolean sendKeys) {
+    this.producerEventhubsSendKey = Optional.of(sendKeys);
+    return this;
+  }
+
+  @Override
+  public Map<String, String> toConfig() {
+    Map<String, String> ehConfigs = new HashMap<>(super.toConfig());
+    String systemName = getSystemName();
+
+    if (!this.streamIds.isEmpty()) {
+      ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), String.join(",", this.streamIds));
+    }
+    this.fetchRuntimeInfoTimeout.ifPresent(timeout ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS, systemName), Integer.toString(timeout)));
+    this.numClientThreads.ifPresent(numClientThreads ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName), Integer.toString(numClientThreads)));
+    this.consumerReceiveQueueSize.ifPresent(receiveQueueSize ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_CONSUMER_BUFFER_CAPACITY, systemName), Integer.toString(receiveQueueSize)));
+    this.consumerMaxEventCountPerPoll.ifPresent(count ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName), Integer.toString(count)));
+    this.consumerPrefetchCount.ifPresent(count ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_PREFETCH_COUNT, systemName), Integer.toString(count)));
+    this.producerEventhubsSendKey.ifPresent(sendKeys ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_SEND_KEY_IN_EVENT_PROPERTIES, systemName), Boolean.toString(sendKeys)));
+    this.producerEventhubsPartitioningMethod.ifPresent(partitioningMethod ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName), partitioningMethod.toString()));
+    return ehConfigs;
+  }
+}
index 307b8f6..26b58d6 100644 (file)
@@ -39,6 +39,7 @@ import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.eventhub.EventHubConfig;
 import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +59,7 @@ public abstract class AsyncSystemProducer implements SystemProducer {
   /**
    * The constant CONFIG_STREAM_LIST. This config is used to get the list of streams produced by this EventHub system.
    */
-  public static final String CONFIG_STREAM_LIST = "systems.%s.stream.list";
+  public static final String CONFIG_STREAM_LIST = EventHubConfig.CONFIG_STREAM_LIST;
 
   private static final String SEND_ERRORS = "sendErrors";
   private static final String SEND_CALLBACK_LATENCY = "sendCallbackLatency";
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsInputDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsInputDescriptor.java
new file mode 100644 (file)
index 0000000..b3003a3
--- /dev/null
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.eventhub;
+
+import java.util.Map;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+
+public class TestEventHubsInputDescriptor {
+  @Test
+  public void testEntityConnectionConfigs() {
+    String systemName = "eventHub";
+    String streamId = "input-stream";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+
+    EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = systemDescriptor
+        .getInputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()))
+        .withSasKeyName("secretkey")
+        .withSasKey("sasToken-123")
+        .withConsumerGroup("$notdefault");
+
+    Map<String, String> generatedConfigs = inputDescriptor.toConfig();
+    assertEquals("eventHub", generatedConfigs.get("streams.input-stream.samza.system"));
+    assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
+    assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
+    assertEquals("secretkey", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
+    assertEquals("sasToken-123", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
+    assertEquals("$notdefault", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId)));
+  }
+
+  @Test
+  public void testWithoutEntityConnectionConfigs() {
+    String systemName = "eventHub";
+    String streamId = "input-stream";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+
+    EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = systemDescriptor
+        .getInputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()));
+
+    Map<String, String> generatedConfigs = inputDescriptor.toConfig();
+    assertEquals("eventHub", generatedConfigs.get("streams.input-stream.samza.system"));
+    assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
+    assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId)));
+    assertEquals(3, generatedConfigs.size()); // verify that there are no other configs
+  }
+
+  @Test
+  public void testMissingInputDescriptorFields() {
+    String systemName = "eventHub";
+    String streamId = "input-stream";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+    try {
+      systemDescriptor.getInputDescriptor(streamId, null, null, KVSerde.of(new StringSerde(), new IntegerSerde()));
+      fail("Should have thrown Config Exception");
+    } catch (ConfigException exception) {
+      assertEquals(String.format("Missing namespace and entity path Event Hubs input descriptor in " //
+          + "system: {%s}, stream: {%s}", systemName, streamId), exception.getMessage());
+    }
+  }
+}
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsOutputDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsOutputDescriptor.java
new file mode 100644 (file)
index 0000000..fcfcdca
--- /dev/null
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.eventhub;
+
+import java.util.Map;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+public class TestEventHubsOutputDescriptor {
+  @Test
+  public void testEntityConnectionConfigs() {
+    String systemName = "eventHub";
+    String streamId = "output-stream";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+
+    EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = systemDescriptor
+        .getOutputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()))
+        .withSasKeyName("secretkey")
+        .withSasKey("sasToken-123");
+
+    Map<String, String> generatedConfigs = outputDescriptor.toConfig();
+    assertEquals("eventHub", generatedConfigs.get("streams.output-stream.samza.system"));
+    assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
+    assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
+    assertEquals("secretkey", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
+    assertEquals("sasToken-123", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
+  }
+
+  @Test
+  public void testWithoutEntityConnectionConfigs() {
+    String systemName = "eventHub";
+    String streamId = "output-stream";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+
+    EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = systemDescriptor
+        .getOutputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()));
+
+    Map<String, String> generatedConfigs = outputDescriptor.toConfig();
+    assertEquals("eventHub", generatedConfigs.get("streams.output-stream.samza.system"));
+    assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
+    assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId)));
+    assertEquals(3, generatedConfigs.size()); // verify that there are no other configs
+  }
+
+  @Test
+  public void testMissingOutputDescriptorFields() {
+    String systemName = "eventHub";
+    String streamId = "input-stream";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+    try {
+      systemDescriptor.getOutputDescriptor(streamId, null, null, KVSerde.of(new StringSerde(), new IntegerSerde()));
+      fail("Should have thrown Config Exception");
+    } catch (ConfigException exception) {
+      assertEquals(String.format("Missing namespace and entity path Event Hubs output descriptor in " //
+          + "system: {%s}, stream: {%s}", systemName, streamId), exception.getMessage());
+    }
+  }
+}
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsSystemDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsSystemDescriptor.java
new file mode 100644 (file)
index 0000000..33bb1ba
--- /dev/null
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.eventhub;
+
+import java.util.Map;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.PartitioningMethod;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestEventHubsSystemDescriptor {
+  @Test
+  public void testWithDescriptorOverrides() {
+    String systemName = "system-name";
+    String streamId1 = "input-stream1";
+    String streamId2 = "input-stream2";
+    String streamId3 = "output-stream1";
+    String streamId4 = "output-stream2";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName)
+        .withMaxEventCountPerPoll(1000)
+        .withNumClientThreads(5)
+        .withPartitioningMethod(PartitioningMethod.PARTITION_KEY_AS_PARTITION)
+        .withPrefetchCount(100)
+        .withReceiveQueueSize(500)
+        .withRuntimeInfoTimeout(60000)
+        .withSendKeys(false);
+
+    systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(),
+        new IntegerSerde()));
+    systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(),
+            new IntegerSerde()));
+    systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(),
+            new IntegerSerde()));
+    systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(),
+            new IntegerSerde()));
+
+    Map<String, String> generatedConfigs = systemDescriptor.toConfig();
+    assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
+    assertEquals("1000", generatedConfigs.get(String.format(EventHubConfig.CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName)));
+    assertEquals("5", generatedConfigs.get(String.format(EventHubConfig.CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName)));
+    assertEquals("PARTITION_KEY_AS_PARTITION", generatedConfigs.get(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName)));
+    assertEquals("100", generatedConfigs.get(String.format(EventHubConfig.CONFIG_PREFETCH_COUNT, systemName)));
+    assertEquals("500", generatedConfigs.get(String.format(EventHubConfig.CONFIG_CONSUMER_BUFFER_CAPACITY, systemName)));
+    assertEquals("60000", generatedConfigs.get(String.format(EventHubConfig.CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS, systemName)));
+    assertEquals("false", generatedConfigs.get(String.format(EventHubConfig.CONFIG_SEND_KEY_IN_EVENT_PROPERTIES, systemName)));
+    assertEquals(streamId1 + "," + streamId2 + "," + streamId3 + "," + streamId4, generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName)));
+  }
+
+  @Test
+  public void testWithoutDescriptorOverrides() {
+    String systemName = "eventHub";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+
+    Map<String, String> generatedConfigs = systemDescriptor.toConfig();
+    assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_PREFETCH_COUNT, systemName)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_CONSUMER_BUFFER_CAPACITY, systemName)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS, systemName)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_SEND_KEY_IN_EVENT_PROPERTIES, systemName)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName)));
+    assertEquals(1, generatedConfigs.size());
+  }
+  @Test
+  public void testWithInputOutputStreams() {
+    String systemName = "system-name";
+    String streamId1 = "input-stream1";
+    String streamId2 = "input-stream2";
+    String streamId3 = "output-stream1";
+    String streamId4 = "output-stream2";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+
+    systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(),
+        new IntegerSerde()));
+    systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(),
+        new IntegerSerde()));
+    systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(),
+        new IntegerSerde()));
+    systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(),
+        new IntegerSerde()));
+
+    Map<String, String> generatedConfigs = systemDescriptor.toConfig();
+    assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
+    assertEquals(streamId1 + "," + streamId2 + "," + streamId3 + "," + streamId4, generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName)));
+    assertEquals(2, generatedConfigs.size());
+  }
+}
index 16accae..6c4ae49 100644 (file)
@@ -50,18 +50,12 @@ public final class DelegatingSystemDescriptor extends SystemDescriptor<Delegatin
     super(systemName, null, null, null);
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public <StreamMessageType> GenericInputDescriptor<StreamMessageType> getInputDescriptor(
       String streamId, Serde<StreamMessageType> serde) {
     return new GenericInputDescriptor<>(streamId, this, serde);
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public <StreamMessageType> GenericOutputDescriptor<StreamMessageType> getOutputDescriptor(
       String streamId, Serde<StreamMessageType> serde) {
index 6fa8915..e88ed70 100644 (file)
@@ -69,17 +69,11 @@ public class KafkaSystemDescriptor extends SystemDescriptor<KafkaSystemDescripto
     super(systemName, FACTORY_CLASS_NAME, null, null);
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public <StreamMessageType> KafkaInputDescriptor<StreamMessageType> getInputDescriptor(String streamId, Serde<StreamMessageType> serde) {
     return new KafkaInputDescriptor<>(streamId, this, serde, null);
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public <StreamMessageType> KafkaOutputDescriptor<StreamMessageType> getOutputDescriptor(String streamId, Serde<StreamMessageType> serde) {
     return new KafkaOutputDescriptor<>(streamId, this, serde);