SAMZA-2041: add hdfs and kinesis descriptor
authorHai Lu <halu@linkedin.com>
Fri, 14 Dec 2018 22:35:45 +0000 (14:35 -0800)
committerHai Lu <halu@linkedin.com>
Fri, 14 Dec 2018 22:35:45 +0000 (14:35 -0800)
Author: Hai Lu <halu@linkedin.com>

Reviewers: Xinyu Liu <xiliu@linkedin.com>

Closes #857 from lhaiesp/master

docs/learn/documentation/versioned/connectors/hdfs.md
docs/learn/documentation/versioned/connectors/kinesis.md
samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java [new file with mode: 0644]
samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java [new file with mode: 0644]
samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisInputDescriptor.java [new file with mode: 0644]
samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisSystemDescriptor.java [new file with mode: 0644]
samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsInputDescriptor.java [new file with mode: 0644]
samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsOutputDescriptor.java [new file with mode: 0644]
samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java [new file with mode: 0644]
samza-hdfs/src/test/java/org/apache/samza/system/hdfs/descriptors/TestHdfsSystemDescriptor.java [new file with mode: 0644]

index ece7bbf..822be7e 100644 (file)
@@ -47,14 +47,11 @@ While streaming sources like Kafka are unbounded, files on HDFS have finite data
 
 #### Defining streams
 
-Samza uses the notion of a _system_ to describe any I/O source it interacts with. To consume from HDFS, you should create a new system that points to - `HdfsSystemFactory`. You can then associate multiple streams with this _system_. Each stream should have a _physical name_, which should be set to the name of the directory on HDFS.
-
-{% highlight jproperties %}
-systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
-
-streams.hdfs-clickstream.samza.system=hdfs
-streams.hdfs-clickstream.samza.physical.name=hdfs:/data/clickstream/2016/09/11
+In Samza high level API, you can use `HdfsSystemDescriptor` to create a HDFS system. The stream name should be set to the name of the directory on HDFS.
 
+{% highlight java %}
+HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream");
+HdfsInputDescriptor hid = hsd.getInputDescriptor("/data/clickstream/2016/09/11");
 {% endhighlight %}
 
 The above example defines a stream called `hdfs-clickstream` that reads data from the `/data/clickstream/2016/09/11` directory. 
@@ -62,9 +59,10 @@ The above example defines a stream called `hdfs-clickstream` that reads data fro
 #### Whitelists & Blacklists
 If you only want to consume from files that match a certain pattern, you can configure a whitelist. Likewise, you can also blacklist consuming from certain files. When both are specified, the _whitelist_ selects the files to be filtered and the _blacklist_ is later applied on its results. 
 
-{% highlight jproperties %}
-systems.hdfs.partitioner.defaultPartitioner.whitelist=.*avro
-systems.hdfs.partitioner.defaultPartitioner.blacklist=somefile.avro
+{% highlight java %}
+HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream")
+                                        .withConsumerWhiteList(".*avro")
+                                        .withConsumerBlackList("somefile.avro");
 {% endhighlight %}
 
 
@@ -74,34 +72,34 @@ systems.hdfs.partitioner.defaultPartitioner.blacklist=somefile.avro
 
 Samza allows writing your output results to HDFS in AVRO format. You can either use avro's GenericRecords or have Samza automatically infer the schema for your object using reflection. 
 
-{% highlight jproperties %}
-# set the SystemFactory implementation to instantiate HdfsSystemProducer aliased to 'hdfs'
-systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
-systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter
+{% highlight java %}
+HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream")
+                                        .withWriterClassName(AvroDataFileHdfsWriter.class.getName());
 {% endhighlight %}
 
 
-If your output is non-avro, you can describe its format by implementing your own serializer.
-{% highlight jproperties %}
-systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter
-serializers.registry.my-serde-name.class=MySerdeFactory
-systems.hdfs.samza.msg.serde=my-serde-name
+If your output is non-avro, use `TextSequenceFileHdfsWriter`.
+{% highlight java %}
+HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream")
+                                        .withWriterClassName(TextSequenceFileHdfsWriter.class.getName());
 {% endhighlight %}
 
 
 #### Output directory structure
 
 Samza allows you to control the base HDFS directory to write your output. You can also organize the output into sub-directories depending on the time your application ran, by configuring a date-formatter. 
-{% highlight jproperties %}
-systems.hdfs.producer.hdfs.base.output.dir=/user/me/analytics/clickstream_data
-systems.hdfs.producer.hdfs.bucketer.date.path.format=yyyy_MM_dd
+{% highlight java %}
+HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream")
+                                        .withOutputBaseDir("/user/me/analytics/clickstream_data")
+                                        .withDatePathFormat("yyyy_MM_dd");
 {% endhighlight %}
 
 You can configure the maximum size of each file or the maximum number of records per-file. Once either limits have been reached, Samza will create a new file.
 
-{% highlight jproperties %}
-systems.hdfs.producer.hdfs.write.batch.size.bytes=134217728
-systems.hdfs.producer.hdfs.write.batch.size.records=10000
+{% highlight java %}
+HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream")
+                                        .withWriteBatchSizeBytes(134217728)
+                                        .withWriteBatchSizeRecords(10000);
 {% endhighlight %}
 
 ### Security 
index e319e92..57dae9c 100644 (file)
@@ -36,22 +36,16 @@ wraps the Record into a [KinesisIncomingMessageEnvelope](https://github.com/apac
 
 #### Basic Configuration
 
-Here is the required configuration for consuming messages from Kinesis. 
-
-{% highlight jproperties %}
-// Define a Kinesis system factory with your identifier. eg: kinesis-system
-systems.kinesis-system.samza.factory=org.apache.samza.system.kinesis.KinesisSystemFactory
-
-// Kinesis consumer works with only AllSspToSingleTaskGrouperFactory
-job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory
-
-// Define your streams
-task.inputs=kinesis-system.input0
-
-// Define required properties for your streams
-systems.kinesis-system.streams.input0.aws.region=YOUR-STREAM-REGION
-systems.kinesis-system.streams.input0.aws.accessKey=YOUR-ACCESS_KEY
-sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY
+Here is the required configuration for consuming messages from Kinesis, through `KinesisSystemDescriptor` and `KinesisInputDescriptor`. 
+
+{% highlight java %}
+KinesisSystemDescriptor ksd = new KinesisSystemDescriptor("kinesis");
+    
+KinesisInputDescriptor<KV<String, byte[]>> kid = 
+    ksd.getInputDescriptor("STREAM-NAME", new NoOpSerde<byte[]>())
+          .withRegion("STREAM-REGION")
+          .withAccessKey("YOUR-ACCESS_KEY")
+          .withSecretKey("YOUR-SECRET-KEY");
 {% endhighlight %}
 
 ####Coordination
@@ -66,10 +60,12 @@ job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.str
 
 Each Kinesis stream in a given AWS [region](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html) can be accessed by providing an [access key](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys). An Access key consists of two parts: an access key ID (for example, `AKIAIOSFODNN7EXAMPLE`) and a secret access key (for example, `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY`) which you can use to send programmatic requests to AWS. 
 
-{% highlight jproperties %}
-systems.kinesis-system.streams.input0.aws.region=YOUR-STREAM-REGION
-systems.kinesis-system.streams.input0.aws.accessKey=YOUR-ACCESS_KEY
-sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY
+{% highlight java %}
+KinesisInputDescriptor<KV<String, byte[]>> kid = 
+    ksd.getInputDescriptor("STREAM-NAME", new NoOpSerde<byte[]>())
+          .withRegion("STREAM-REGION")
+          .withAccessKey("YOUR-ACCESS_KEY")
+          .withSecretKey("YOUR-SECRET-KEY");
 {% endhighlight %}
 
 ### Advanced Configuration
@@ -77,29 +73,44 @@ sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY
 #### Kinesis Client Library Configs
 Samza Kinesis Connector uses the [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html#kinesis-record-processor-overview-kcl)
 (KCL) to access the Kinesis data streams. You can set any [KCL Configuration](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java)
-for a stream by configuring it with the **systems.system-name.streams.stream-name.aws.kcl.*** prefix.
+for a stream by configuring it through `KinesisInputDescriptor`.
 
-{% highlight jproperties %}
-systems.system-name.streams.stream-name.aws.kcl.CONFIG-PARAM=CONFIG-VALUE
+{% highlight java %}
+KinesisInputDescriptor<KV<String, byte[]>> kid = ...
+
+Map<String, String> kclConfig = new HashMap<>;
+kclConfig.put("CONFIG-PARAM", "CONFIG-VALUE");
+
+kid.withKCLConfig(kclConfig);
 {% endhighlight %}
 
 As an example, the below configuration is equivalent to invoking `kclClient#WithTableName(myTable)` on the KCL instance.
-{% highlight jproperties %}
-systems.system-name.streams.stream-name.aws.kcl.TableName=myTable
+{% highlight java %}
+KinesisInputDescriptor<KV<String, byte[]>> kid = ...
+
+Map<String, String> kclConfig = new HashMap<>;
+kclConfig.put("TableName", "myTable");
+
+kid.withKCLConfig(kclConfig);
 {% endhighlight %}
 
 #### AWS Client configs
 Samza allows you to specify any [AWS client configs](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) to connect to your Kinesis instance.
-You can configure any [AWS client configuration](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) with the `systems.your-system-name.aws.clientConfig.*` prefix.
+You can configure any [AWS client configuration](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) through `KinesisSystemDescriptor`.
 
-{% highlight jproperties %}
-systems.system-name.aws.clientConfig.CONFIG-PARAM=CONFIG-VALUE
+{% highlight java %}
+Map<String, String> awsConfig = new HashMap<>;
+awsConfig.put("CONFIG-PARAM", "CONFIG-VALUE");
+
+KinesisSystemDescriptor sd = new KinesisSystemDescriptor(systemName)
+                                          .withAWSConfig(awsConfig);
 {% endhighlight %}
 
-As an example, to set the *proxy host* and *proxy port* to be used by the Kinesis Client:
-{% highlight jproperties %}
-systems.system-name.aws.clientConfig.ProxyHost=my-proxy-host.com
-systems.system-name.aws.clientConfig.ProxyPort=my-proxy-port
+Through `KinesisSystemDescriptor` you can also set the *proxy host* and *proxy port* to be used by the Kinesis Client:
+{% highlight java %}
+KinesisSystemDescriptor sd = new KinesisSystemDescriptor(systemName)
+                                          .withProxyHost("YOUR-PROXY-HOST")
+                                          .withProxyPort(YOUR-PROXY-PORT);
 {% endhighlight %}
 
 ### Resetting Offsets
@@ -109,14 +120,37 @@ These checkpoints are stored and managed by the KCL library internally. You can
 
 {% highlight jproperties %}
 // change the TableName to a unique name to reset checkpoints.
-systems.kinesis-system.streams.input0.aws.kcl.TableName=my-app-table-name
+systems.kinesis-system.streams.STREAM-NAME.aws.kcl.TableName=my-app-table-name
+{% endhighlight %}
+
+Or through `KinesisInputDescriptor`
+
+{% highlight java %}
+KinesisInputDescriptor<KV<String, byte[]>> kid = ...
+
+Map<String, String> kclConfig = new HashMap<>;
+kclConfig.put("TableName", "my-new-app-table-name");
+
+kid.withKCLConfig(kclConfig);
 {% endhighlight %}
 
+
 When you reset checkpoints, you can configure your job to start consuming from either the earliest or latest offset in the stream.  
 
 {% highlight jproperties %}
 // set the starting position to either TRIM_HORIZON (oldest) or LATEST (latest)
-systems.kinesis-system.streams.input0.aws.kcl.InitialPositionInStream=LATEST
+systems.kinesis-system.streams.STREAM-NAME.aws.kcl.InitialPositionInStream=LATEST
+{% endhighlight %}
+
+Or through `KinesisInputDescriptor`
+
+{% highlight java %}
+KinesisInputDescriptor<KV<String, byte[]>> kid = ...
+
+Map<String, String> kclConfig = new HashMap<>;
+kclConfig.put("InitialPositionInStream", "LATEST");
+
+kid.withKCLConfig(kclConfig);
 {% endhighlight %}
 
 Alternately, if you want to start from a particular offset in the Kinesis stream, you can login to the [AWS console](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) and edit the offsets in your DynamoDB Table.
index d11096f..e0c9099 100644 (file)
@@ -54,20 +54,20 @@ import com.amazonaws.ClientConfiguration;
 public class KinesisConfig extends MapConfig {
   private static final Logger LOG = LoggerFactory.getLogger(KinesisConfig.class.getName());
 
-  private static final String CONFIG_SYSTEM_REGION = "systems.%s.aws.region";
-  private static final String CONFIG_STREAM_REGION = "systems.%s.streams.%s.aws.region";
+  public static final String CONFIG_SYSTEM_REGION = "systems.%s.aws.region";
+  public static final String CONFIG_STREAM_REGION = "systems.%s.streams.%s.aws.region";
 
-  private static final String CONFIG_STREAM_ACCESS_KEY = "systems.%s.streams.%s.aws.accessKey";
-  private static final String CONFIG_STREAM_SECRET_KEY = "sensitive.systems.%s.streams.%s.aws.secretKey";
+  public static final String CONFIG_STREAM_ACCESS_KEY = "systems.%s.streams.%s.aws.accessKey";
+  public static final String CONFIG_STREAM_SECRET_KEY = "sensitive.systems.%s.streams.%s.aws.secretKey";
 
-  private static final String CONFIG_AWS_CLIENT_CONFIG = "systems.%s.aws.clientConfig.";
-  private static final String CONFIG_PROXY_HOST = CONFIG_AWS_CLIENT_CONFIG + "ProxyHost";
-  private static final String DEFAULT_CONFIG_PROXY_HOST = "";
-  private static final String CONFIG_PROXY_PORT = CONFIG_AWS_CLIENT_CONFIG + "ProxyPort";
-  private static final int DEFAULT_CONFIG_PROXY_PORT = 0;
+  public static final String CONFIG_AWS_CLIENT_CONFIG = "systems.%s.aws.clientConfig.";
+  public static final String CONFIG_PROXY_HOST = CONFIG_AWS_CLIENT_CONFIG + "ProxyHost";
+  public static final String DEFAULT_CONFIG_PROXY_HOST = "";
+  public static final String CONFIG_PROXY_PORT = CONFIG_AWS_CLIENT_CONFIG + "ProxyPort";
+  public static final int DEFAULT_CONFIG_PROXY_PORT = 0;
 
-  private static final String CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.aws.kcl.";
-  private static final String CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.streams.%s.aws.kcl.";
+  public static final String CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.aws.kcl.";
+  public static final String CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.streams.%s.aws.kcl.";
 
   public KinesisConfig(Config config) {
     super(config);
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java
new file mode 100644 (file)
index 0000000..1c2e0a2
--- /dev/null
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.kinesis.descriptors;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.system.kinesis.KinesisConfig;
+
+
+/**
+ * A {@link KinesisInputDescriptor} can be used for specifying Samza and Kinesis specific properties of Kinesis
+ * input streams.
+ * <p>
+ * Use {@link KinesisSystemDescriptor#getInputDescriptor} to obtain an instance of this descriptor.
+ * <p>
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
+ *
+ * @param <StreamMessageType> type of messages in this stream
+ */
+public class KinesisInputDescriptor<StreamMessageType>
+    extends InputDescriptor<StreamMessageType, KinesisInputDescriptor<StreamMessageType>> {
+  private Optional<String> accessKey = Optional.empty();
+  private Optional<String> secretKey = Optional.empty();
+  private Optional<String> region = Optional.empty();
+  private Map<String, String> kclConfig = Collections.emptyMap();
+
+
+  /**
+   * Constructs an {@link InputDescriptor} instance.
+   *
+   * @param streamId id of the stream
+   * @param valueSerde serde the values in the messages in the stream
+   * @param systemDescriptor system descriptor this stream descriptor was obtained from
+   */
+  <T> KinesisInputDescriptor(String streamId, Serde<T> valueSerde, SystemDescriptor systemDescriptor) {
+    super(streamId, KVSerde.of(new NoOpSerde<>(), valueSerde), systemDescriptor, null);
+  }
+
+  /**
+   * Kinesis region for the system stream.
+   * @param region Kinesis region
+   * @return this input descriptor
+   */
+  public KinesisInputDescriptor<StreamMessageType> withRegion(String region) {
+    this.region = Optional.of(StringUtils.stripToNull(region));
+    return this;
+  }
+
+  /**
+   * Kinesis access key name for the system stream.
+   * @param accessKey Kinesis access key name
+   * @return this input descriptor
+   */
+  public KinesisInputDescriptor<StreamMessageType> withAccessKey(String accessKey) {
+    this.accessKey = Optional.of(StringUtils.stripToNull(accessKey));
+    return this;
+  }
+
+  /**
+   * Kinesis secret key name for the system stream.
+   * @param secretKey Kinesis secret key
+   * @return this input descriptor
+   */
+  public KinesisInputDescriptor<StreamMessageType> withSecretKey(String secretKey) {
+    this.secretKey = Optional.of(StringUtils.stripToNull(secretKey));
+    return this;
+  }
+
+  /**
+   * KCL (Kinesis Client Library) config for the system stream. This is not required by default.
+   * @param kclConfig A map of specified KCL configs
+   * @return this input descriptor
+   */
+  public KinesisInputDescriptor<StreamMessageType> withKCLConfig(Map<String, String> kclConfig) {
+    this.kclConfig = kclConfig;
+    return this;
+  }
+
+  @Override
+  public Map<String, String> toConfig() {
+    Map<String, String> config = new HashMap<>(super.toConfig());
+
+    String systemName = getSystemName();
+    String streamId = getStreamId();
+    String clientConfigPrefix =
+        String.format(KinesisConfig.CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, systemName, streamId);
+
+    this.region.ifPresent(
+        val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_REGION, systemName, streamId), val));
+    this.accessKey.ifPresent(
+        val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_ACCESS_KEY, systemName, streamId), val));
+    this.secretKey.ifPresent(
+        val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_SECRET_KEY, systemName, streamId), val));
+    this.kclConfig.forEach((k, v) -> config.put(clientConfigPrefix + k, v));
+
+    return config;
+  }
+}
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java
new file mode 100644 (file)
index 0000000..ffeb667
--- /dev/null
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.kinesis.descriptors;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.system.kinesis.KinesisConfig;
+import org.apache.samza.system.kinesis.KinesisSystemFactory;
+
+
+/**
+ * A {@link KinesisSystemDescriptor} can be used for specifying Samza and Kinesis-specific properties of a Kinesis
+ * input system. It can also be used for obtaining {@link KinesisInputDescriptor}s,
+ * which can be used for specifying Samza and system-specific properties of Kinesis input streams.
+ * <p>
+ * System properties provided in configuration override corresponding properties specified using a descriptor.
+ */
+public class KinesisSystemDescriptor extends SystemDescriptor<KinesisSystemDescriptor> {
+  private static final String FACTORY_CLASS_NAME = KinesisSystemFactory.class.getName();
+
+  private Optional<String> region = Optional.empty();
+  private Optional<String> proxyHost = Optional.empty();
+  private Optional<Integer> proxyPort = Optional.empty();
+  private Map<String, String> awsConfig = Collections.emptyMap();
+  private Map<String, String> kclConfig = Collections.emptyMap();
+
+  public KinesisSystemDescriptor(String systemName) {
+    super(systemName, FACTORY_CLASS_NAME, null, null);
+  }
+
+  /**
+   * Gets an {@link KinesisInputDescriptor} for the input stream of this system.
+   * <p>
+   * The message in the stream will have {@link String} keys and {@code ValueType} values.
+   *
+   * @param streamId id of the input stream
+   * @param valueSerde stream level serde for the values in the messages in the input stream
+   * @param <ValueType> type of the value in the messages in this stream
+   * @return an {@link KinesisInputDescriptor} for the Kinesis input stream
+   */
+  public <ValueType> KinesisInputDescriptor<KV<String, ValueType>> getInputDescriptor(String streamId,
+      Serde<ValueType> valueSerde) {
+    return new KinesisInputDescriptor<>(streamId, valueSerde, this);
+  }
+
+  /**
+   * Kinesis region for this system.
+   * @param region Kinesis region
+   * @return this system descriptor
+   */
+  public KinesisSystemDescriptor withRegion(String region) {
+    this.region = Optional.of(StringUtils.stripToNull(region));
+    return this;
+  }
+
+  /**
+   * AWS config for this system. This is not required by default.
+   * @param awsConfig A map of specified AWS configs
+   * @return this system descriptor
+   */
+  public KinesisSystemDescriptor withAWSConfig(Map<String, String> awsConfig) {
+    this.awsConfig = awsConfig;
+    return this;
+  }
+
+  /**
+   * KCL (Kinesis Client Library) config for this system. This is not required by default.
+   * @param kclConfig A map of specified KCL configs
+   * @return this system descriptor
+   */
+  public KinesisSystemDescriptor withKCLConfig(Map<String, String> kclConfig) {
+    this.kclConfig = kclConfig;
+    return this;
+  }
+
+  /**
+   * Proxy host to be used for this system.
+   * @param proxyHost Proxy host
+   * @return this system descriptor
+   */
+  public KinesisSystemDescriptor withProxyHost(String proxyHost) {
+    this.proxyHost = Optional.of(StringUtils.stripToNull(proxyHost));
+    return this;
+  }
+
+  /**
+   * Proxy port to be used for this system.
+   * @param proxyPort Proxy port
+   * @return this system descriptor
+   */
+  public KinesisSystemDescriptor withProxyPort(int proxyPort) {
+    this.proxyPort = Optional.of(proxyPort);
+    return this;
+  }
+
+  @Override
+  public Map<String, String> toConfig() {
+    Map<String, String> config = new HashMap<>(super.toConfig());
+    String systemName = getSystemName();
+
+    this.region.ifPresent(
+        val -> config.put(String.format(KinesisConfig.CONFIG_SYSTEM_REGION, systemName), val));
+    this.proxyHost.ifPresent(
+        val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_HOST, systemName), val));
+    this.proxyPort.ifPresent(
+        val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_PORT, systemName), String.valueOf(val)));
+
+    final String kclConfigPrefix = String.format(KinesisConfig.CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, systemName);
+    this.kclConfig.forEach((k, v) -> config.put(kclConfigPrefix + k, v));
+
+    final String awsConfigPrefix = String.format(KinesisConfig.CONFIG_AWS_CLIENT_CONFIG, systemName);
+    this.awsConfig.forEach((k, v) -> config.put(awsConfigPrefix + k, v));
+
+    return config;
+  }
+}
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisInputDescriptor.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisInputDescriptor.java
new file mode 100644 (file)
index 0000000..f0a8a3b
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.kinesis.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.system.kinesis.KinesisConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestKinesisInputDescriptor {
+  @Test
+  public void testConfigGeneration() {
+    String systemName = "kinesis";
+    String streamName = "Seine";
+    KinesisSystemDescriptor sd = new KinesisSystemDescriptor(systemName);
+    Map<String, String> cliConfig = new HashMap<>();
+    cliConfig.put("key1", "value1");
+    KinesisInputDescriptor<KV<String, byte[]>> id = sd.getInputDescriptor(streamName, new NoOpSerde<byte[]>())
+        .withRegion("Paris")
+        .withAccessKey("accessKey")
+        .withSecretKey("secretKey")
+        .withKCLConfig(cliConfig);
+
+    Map<String, String> generatedConfig = id.toConfig();
+    Assert.assertEquals(5, generatedConfig.size());
+
+    Assert.assertEquals(systemName, generatedConfig.get("streams.Seine.samza.system"));
+    Assert.assertEquals("Paris",
+        generatedConfig.get(String.format(KinesisConfig.CONFIG_STREAM_REGION, systemName, streamName)));
+    Assert.assertEquals("accessKey",
+        generatedConfig.get(String.format(KinesisConfig.CONFIG_STREAM_ACCESS_KEY, systemName, streamName)));
+    Assert.assertEquals("secretKey",
+        generatedConfig.get(String.format(KinesisConfig.CONFIG_STREAM_SECRET_KEY, systemName, streamName)));
+    Assert.assertEquals("value1", generatedConfig.get(
+        String.format(KinesisConfig.CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, systemName, streamName) + "key1"));
+  }
+}
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisSystemDescriptor.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisSystemDescriptor.java
new file mode 100644 (file)
index 0000000..f12dad1
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.kinesis.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.system.kinesis.KinesisConfig;
+import org.apache.samza.system.kinesis.KinesisSystemFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestKinesisSystemDescriptor {
+  @Test
+  public void testConfigGeneration() {
+    String systemName = "kinesis";
+    Map<String, String> kclConfig = new HashMap<>();
+    kclConfig.put("key1", "value1");
+    Map<String, String> awsConfig = new HashMap<>();
+    awsConfig.put("key2", "value2");
+
+    KinesisSystemDescriptor sd = new KinesisSystemDescriptor(systemName).withRegion("London")
+        .withProxyHost("US")
+        .withProxyPort(1776)
+        .withAWSConfig(awsConfig)
+        .withKCLConfig(kclConfig);
+
+    Map<String, String> generatedConfig = sd.toConfig();
+    Assert.assertEquals(6, generatedConfig.size());
+
+    Assert.assertEquals(KinesisSystemFactory.class.getName(), generatedConfig.get("systems.kinesis.samza.factory"));
+    Assert.assertEquals("London", generatedConfig.get(String.format(KinesisConfig.CONFIG_SYSTEM_REGION, systemName)));
+    Assert.assertEquals("US", generatedConfig.get(String.format(KinesisConfig.CONFIG_PROXY_HOST, systemName)));
+    Assert.assertEquals("1776", generatedConfig.get(String.format(KinesisConfig.CONFIG_PROXY_PORT, systemName)));
+    Assert.assertEquals("value1",
+        generatedConfig.get(String.format(KinesisConfig.CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, systemName) + "key1"));
+    Assert.assertEquals("value2",
+        generatedConfig.get(String.format(KinesisConfig.CONFIG_AWS_CLIENT_CONFIG, systemName) + "key2"));
+  }
+}
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsInputDescriptor.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsInputDescriptor.java
new file mode 100644 (file)
index 0000000..e3e3fa4
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.hdfs.descriptors;
+
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+
+
+/**
+ * A {@link HdfsInputDescriptor} can be used for specifying Samza and HDFS specific properties of HDFS
+ * input streams.
+ * <p>
+ * Use {@link HdfsSystemDescriptor#getInputDescriptor} to obtain an instance of this descriptor.
+ * <p>
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
+ *
+ */
+public class HdfsInputDescriptor
+    extends InputDescriptor<Object, HdfsInputDescriptor> {
+
+  /**
+   * Constructs an {@link InputDescriptor} instance. Hdfs input has no key. Value type is determined by
+   * reader type (see {@link HdfsSystemDescriptor#withReaderType}).
+   *
+   * @param streamId id of the stream
+   * @param systemDescriptor system descriptor this stream descriptor was obtained from
+   */
+  HdfsInputDescriptor(String streamId, SystemDescriptor systemDescriptor) {
+    super(streamId, new NoOpSerde(), systemDescriptor, null);
+  }
+}
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsOutputDescriptor.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsOutputDescriptor.java
new file mode 100644 (file)
index 0000000..7b7e118
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.hdfs.descriptors;
+
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.system.descriptors.OutputDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+
+/**
+ * A {@link HdfsOutputDescriptor} can be used for specifying Samza and HDFS-specific properties of HDFS
+ * output streams.
+ * <p>
+ * Use {@link HdfsSystemDescriptor#getOutputDescriptor} to obtain an instance of this descriptor.
+ * <p>
+ * Stream properties provided in configuration override corresponding properties specified using a descriptor.
+ */
+public class HdfsOutputDescriptor
+    extends OutputDescriptor<Object, HdfsOutputDescriptor> {
+
+  /**
+   * Constructs an {@link OutputDescriptor} instance. Hdfs output has no key. Value type is determined by
+   * writer class (see {@link HdfsSystemDescriptor#withWriterClassName}).
+   *
+   * @param streamId id of the stream
+   * @param systemDescriptor system descriptor this stream descriptor was obtained from
+   */
+  HdfsOutputDescriptor(String streamId, SystemDescriptor systemDescriptor) {
+    super(streamId, new NoOpSerde(), systemDescriptor);
+  }
+}
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java
new file mode 100644 (file)
index 0000000..f4d8566
--- /dev/null
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.hdfs.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.system.hdfs.HdfsConfig;
+import org.apache.samza.system.hdfs.HdfsSystemFactory;
+
+
+/**
+ * A {@link HdfsSystemDescriptor} can be used for specifying Samza and HDFS-specific properties of a HDFS
+ * input/output system. It can also be used for obtaining {@link HdfsInputDescriptor}s and
+ * {@link HdfsOutputDescriptor}s, which can be used for specifying Samza and system-specific properties of
+ * HDFS input/output streams.
+ * <p>
+ * System properties provided in configuration override corresponding properties specified using a descriptor.
+ */
+public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor> {
+  private static final String FACTORY_CLASS_NAME = HdfsSystemFactory.class.getName();
+
+  private Optional<String> datePathFormat = Optional.empty();
+  private Optional<String> outputBaseDir = Optional.empty();
+  private Optional<Long> writeBatchSizeBytes = Optional.empty();
+  private Optional<Long> writeBatchSizeRecords = Optional.empty();
+  private Optional<String> writeCompressionType = Optional.empty();
+  private Optional<String> writerClass = Optional.empty();
+
+  private Optional<Long> consumerBufferCapacity = Optional.empty();
+  private Optional<Long> consumerMaxRetries = Optional.empty();
+  private Optional<String> consumerWhiteList = Optional.empty();
+  private Optional<String> consumerBlackList = Optional.empty();
+  private Optional<String> consumerGroupPattern = Optional.empty();
+  private Optional<String> consumerReader = Optional.empty();
+  private Optional<String> consumerStagingDirectory = Optional.empty();
+
+  public HdfsSystemDescriptor(String systemName) {
+    super(systemName, FACTORY_CLASS_NAME, null, null);
+  }
+
+  /**
+   * Gets an {@link HdfsInputDescriptor} for the input stream of this system.
+   * <p>
+   * The message in the stream has no key and the value type is determined by reader type.
+   *
+   * @param streamId id of the input stream
+   * @return an {@link HdfsInputDescriptor} for the hdfs input stream
+   */
+  public HdfsInputDescriptor getInputDescriptor(String streamId) {
+    return new HdfsInputDescriptor(streamId, this);
+  }
+
+  /**
+   * Gets an {@link HdfsOutputDescriptor} for the output stream of this system.
+   * <p>
+   * The message in the stream has no key and the value type is determined by writer class.
+   *
+   * @param streamId id of the output stream
+   * @return an {@link HdfsOutputDescriptor} for the hdfs output stream
+   */
+  public HdfsOutputDescriptor getOutputDescriptor(String streamId) {
+    return new HdfsOutputDescriptor(streamId, this);
+  }
+
+  /**
+   * In an HdfsWriter implementation that performs time-based output bucketing,
+   * the user may configure a date format (suitable for inclusion in a file path)
+   * using <code>SimpleDateFormat</code> formatting that the Bucketer implementation will
+   * use to generate HDFS paths and filenames. The more granular this date format, the more
+   * often a bucketing HdfsWriter will begin a new date-path bucket when creating the next output file.
+   * @param datePathFormat date path format
+   * @return this system descriptor
+   */
+  public HdfsSystemDescriptor withDatePathFormat(String datePathFormat) {
+    this.datePathFormat = Optional.of(StringUtils.stripToNull(datePathFormat));
+    return this;
+  }
+
+  /**
+   * The base output directory into which all HDFS output for this job will be written.
+   * @param outputBaseDir output base directory
+   * @return this system descriptor
+   */
+  public HdfsSystemDescriptor withOutputBaseDir(String outputBaseDir) {
+    this.outputBaseDir = Optional.of(StringUtils.stripToNull(outputBaseDir));
+    return this;
+  }
+
+  /**
+   * Split output files from all writer tasks based on # of bytes written to optimize
+   * MapReduce utilization for Hadoop jobs that will process the data later.
+   * @param writeBatchSizeBytes write batch size in bytes.
+   * @return this system descriptor
+   */
+  public HdfsSystemDescriptor withWriteBatchSizeBytes(long writeBatchSizeBytes) {
+    this.writeBatchSizeBytes = Optional.of(writeBatchSizeBytes);
+    return this;
+  }
+
+  /**
+   * Split output files from all writer tasks based on # of bytes written to optimize
+   * MapReduce utilization for Hadoop jobs that will process the data later.
+   * @param writeBatchSizeRecords write batch size in records.
+   * @return this system descriptor
+   */
+  public HdfsSystemDescriptor withWriteBatchSizeRecords(long writeBatchSizeRecords) {
+    this.writeBatchSizeRecords = Optional.of(writeBatchSizeRecords);
+    return this;
+  }
+
+  /**
+   * Simple, human-readable label for various compression options. HdfsWriter implementations
+   * can choose how to handle these individually, or throw an exception. Example: "none", "gzip", ...
+   * @param writeCompressionType compression type for writer.
+   * @return this system descriptor
+   */
+  public HdfsSystemDescriptor withWriteCompressionType(String writeCompressionType) {
+    this.writeCompressionType = Optional.of(StringUtils.stripToNull(writeCompressionType));
+    return this;
+  }
+
+  /**
+   * The fully-qualified class name of the HdfsWriter subclass that will write for this system.
+   * @param writerClassName writer class name.
+   * @return this system descriptor
+   */
+  public HdfsSystemDescriptor withWriterClassName(String writerClassName) {
+    this.writerClass = Optional.of(StringUtils.stripToNull(writerClassName));
+    return this;
+  }
+
+  /**
+   * The capacity of the hdfs consumer buffer - the blocking queue used for storing messages.
+   * @param bufferCapacity the buffer capacity for HDFS consumer.
+   * @return this system descriptor
+   */
+  public HdfsSystemDescriptor withConsumerBufferCapacity(long bufferCapacity) {
+    this.consumerBufferCapacity = Optional.of(bufferCapacity);
+    return this;
+  }
+
+  /**
+   * Number of max retries for the hdfs consumer readers per partition.
+   * @param maxRetries number of max retires for HDFS consumer.
+   * @return this system descriptor
+   */
+  public HdfsSystemDescriptor withConsumerNumMaxRetries(long maxRetries) {
+    this.consumerMaxRetries = Optional.of(maxRetries);
+    return this;
+  }
+
+  /**
+   * White list used by directory partitioner to filter out unwanted files in a hdfs directory.
+   * @param whiteList white list for HDFS consumer inputs.
+   * @return this system descriptor
+   */
+  public HdfsSystemDescriptor withConsumerWhiteList(String whiteList) {
+    this.consumerWhiteList = Optional.of(StringUtils.stripToNull(whiteList));
+    return this;
+  }
+
+  /**
+   * Black list used by directory partitioner to filter out unwanted files in a hdfs directory.
+   * @param blackList black list for HDFS consumer inputs.
+   * @return this system descriptor
+   */
+  public HdfsSystemDescriptor withConsumerBlackList(String blackList) {
+    this.consumerBlackList = Optional.of(StringUtils.stripToNull(blackList));
+    return this;
+  }
+
+  /**
+   * Group pattern used by directory partitioner for advanced partitioning.
+   * @param groupPattern group parttern for HDFS consumer inputs.
+   * @return this system descriptor
+   */
+  public HdfsSystemDescriptor withConsumerGroupPattern(String groupPattern) {
+    this.consumerGroupPattern = Optional.of(StringUtils.stripToNull(groupPattern));
+    return this;
+  }
+
+  /**
+   * The type of the file reader for consumer (avro, plain, etc.)
+   * @param readerType reader type for HDFS consumer inputs.
+   * @return this system descriptor
+   */
+  public HdfsSystemDescriptor withReaderType(String readerType) {
+    this.consumerReader = Optional.of(StringUtils.stripToNull(readerType));
+    return this;
+  }
+
+  /**
+   * Staging directory for storing partition description. If not set, will use the staging directory set
+   * by yarn job.
+   * @param stagingDirectory staging directory for HDFS consumer inputs.
+   * @return this system descriptor
+   */
+  public HdfsSystemDescriptor withStagingDirectory(String stagingDirectory) {
+    this.consumerStagingDirectory = Optional.of(StringUtils.stripToNull(stagingDirectory));
+    return this;
+  }
+
+  @Override
+  public Map<String, String> toConfig() {
+    Map<String, String> config = new HashMap<>(super.toConfig());
+    String systemName = getSystemName();
+
+    this.datePathFormat.ifPresent(
+        val -> config.put(String.format(HdfsConfig.DATE_PATH_FORMAT_STRING(), systemName), val));
+    this.outputBaseDir.ifPresent(val -> config.put(String.format(HdfsConfig.BASE_OUTPUT_DIR(), systemName), val));
+    this.writeBatchSizeBytes.ifPresent(
+        val -> config.put(String.format(HdfsConfig.WRITE_BATCH_SIZE_BYTES(), systemName), String.valueOf(val)));
+    this.writeBatchSizeRecords.ifPresent(
+        val -> config.put(String.format(HdfsConfig.WRITE_BATCH_SIZE_RECORDS(), systemName), String.valueOf(val)));
+    this.writeCompressionType.ifPresent(
+        val -> config.put(String.format(HdfsConfig.COMPRESSION_TYPE(), systemName), val));
+    this.writerClass.ifPresent(val -> config.put(String.format(HdfsConfig.HDFS_WRITER_CLASS_NAME(), systemName), val));
+
+    this.consumerBufferCapacity.ifPresent(
+        val -> config.put(String.format(HdfsConfig.CONSUMER_BUFFER_CAPACITY(), systemName), String.valueOf(val)));
+    this.consumerMaxRetries.ifPresent(
+        val -> config.put(String.format(HdfsConfig.CONSUMER_NUM_MAX_RETRIES(), systemName), String.valueOf(val)));
+    this.consumerWhiteList.ifPresent(
+        val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST(), systemName), val));
+    this.consumerBlackList.ifPresent(
+        val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_BLACKLIST(), systemName), val));
+    this.consumerGroupPattern.ifPresent(
+        val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_GROUP_PATTERN(), systemName), val));
+    this.consumerReader.ifPresent(val -> config.put(String.format(HdfsConfig.FILE_READER_TYPE(), systemName), val));
+    this.consumerStagingDirectory.ifPresent(
+        val -> config.put(String.format(HdfsConfig.STAGING_DIRECTORY(), systemName), val));
+
+    return config;
+  }
+}
diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/descriptors/TestHdfsSystemDescriptor.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/descriptors/TestHdfsSystemDescriptor.java
new file mode 100644 (file)
index 0000000..78d85e9
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.hdfs.descriptors;
+
+import java.util.Map;
+
+import org.apache.samza.system.hdfs.HdfsConfig;
+import org.apache.samza.system.hdfs.HdfsSystemFactory;
+import org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestHdfsSystemDescriptor {
+  @Test
+  public void testMajorConfigGeneration() {
+    String systemName = "hdfs";
+
+    HdfsSystemDescriptor sd = new HdfsSystemDescriptor(systemName).withConsumerBufferCapacity(950)
+        .withConsumerWhiteList(".*")
+        .withReaderType("avro")
+        .withOutputBaseDir("/home/output")
+        .withWriterClassName(AvroDataFileHdfsWriter.class.getName());
+    sd.getInputDescriptor("input");
+
+    Map<String, String> generatedConfig = sd.toConfig();
+    Assert.assertEquals(6, generatedConfig.size());
+    System.out.println(generatedConfig);
+
+    Assert.assertEquals(HdfsSystemFactory.class.getName(), generatedConfig.get("systems.hdfs.samza.factory"));
+    Assert.assertEquals("950", generatedConfig.get(String.format(HdfsConfig.CONSUMER_BUFFER_CAPACITY(), systemName)));
+    Assert.assertEquals(".*",
+        generatedConfig.get(String.format(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST(), systemName)));
+    Assert.assertEquals("avro", generatedConfig.get(String.format(HdfsConfig.FILE_READER_TYPE(), systemName)));
+    Assert.assertEquals("/home/output", generatedConfig.get(String.format(HdfsConfig.BASE_OUTPUT_DIR(), systemName)));
+    Assert.assertEquals(AvroDataFileHdfsWriter.class.getName(),
+        generatedConfig.get(String.format(HdfsConfig.HDFS_WRITER_CLASS_NAME(), systemName)));
+  }
+}