SAMZA-693: added simple HDFS Producer system
authorEli Reisman <apache.mailbox@gmail.com>
Fri, 7 Aug 2015 22:31:53 +0000 (15:31 -0700)
committerYan Fang <yanfang724@gmail.com>
Fri, 7 Aug 2015 22:31:53 +0000 (15:31 -0700)
21 files changed:
build.gradle
docs/learn/documentation/versioned/hdfs/producer.md [new file with mode: 0644]
docs/learn/documentation/versioned/index.html
docs/learn/documentation/versioned/jobs/configuration-table.html
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala [new file with mode: 0644]
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala [new file with mode: 0644]
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala [new file with mode: 0644]
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala [new file with mode: 0644]
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducerMetrics.scala [new file with mode: 0644]
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/BinarySequenceFileHdfsWriter.scala [new file with mode: 0644]
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/Bucketer.scala [new file with mode: 0644]
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/HdfsWriter.scala [new file with mode: 0644]
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/JobNameDateTimeBucketer.scala [new file with mode: 0644]
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/SequenceFileHdfsWriter.scala [new file with mode: 0644]
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/TextSequenceFileHdfsWriter.scala [new file with mode: 0644]
samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties [new file with mode: 0644]
samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties [new file with mode: 0644]
samza-hdfs/src/test/resources/samza-hdfs-test-job-text.properties [new file with mode: 0644]
samza-hdfs/src/test/resources/samza-hdfs-test-job.properties [new file with mode: 0644]
samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala [new file with mode: 0644]
settings.gradle

index 0852adc..cc30b45 100644 (file)
@@ -81,7 +81,9 @@ rat {
     'samza-test/state/mystore/**',
     'README.md',
     'RELEASE.md',
-    'samza-test/src/main/resources/**'
+    'samza-test/src/main/resources/**',
+    'samza-hdfs/src/main/resources/**',
+    'samza-hdfs/src/test/resources/**'
   ]
 }
 
@@ -209,8 +211,8 @@ project(":samza-kafka_$scalaVersion") {
     minHeapSize = "1560m"
     maxHeapSize = "1560m"
     jvmArgs = ["-XX:+UseConcMarkSweepGC", "-server"]
-    // There appear to be issues between TestKafkaSystemAdmin and 
-    // TestKafkaCheckpointManager both running brokeres and ZK. Restarting the 
+    // There appear to be issues between TestKafkaSystemAdmin and
+    // TestKafkaCheckpointManager both running brokeres and ZK. Restarting the
     // gradle worker after every test clears things up. These tests should be
     // moved to the integration test suite.
     forkEvery = 1
@@ -399,6 +401,30 @@ project(":samza-kv-rocksdb_$scalaVersion") {
   }
 }
 
+project(":samza-hdfs_$scalaVersion") {
+    apply plugin: 'scala'
+
+    dependencies {
+        compile project(':samza-api')
+        compile project(":samza-core_$scalaVersion")
+        compile project(":samza-kafka_$scalaVersion")
+        compile "org.scala-lang:scala-library:$scalaLibVersion"
+        compile("org.apache.hadoop:hadoop-hdfs:$yarnVersion") {
+          exclude module: 'slf4j-log4j12'
+          exclude module: 'servlet-api'
+        }
+        compile("org.apache.hadoop:hadoop-common:$yarnVersion") {
+          exclude module: 'slf4j-log4j12'
+          exclude module: 'servlet-api'
+          exclude module: 'zookeeper'
+        }
+
+        testCompile "junit:junit:$junitVersion"
+        testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
+        testCompile "org.apache.hadoop:hadoop-minicluster:$yarnVersion"
+    }
+}
+
 project(":samza-test_$scalaVersion") {
   apply plugin: 'scala'
   apply plugin: 'checkstyle'
@@ -418,6 +444,7 @@ project(":samza-test_$scalaVersion") {
     runtime project(":samza-log4j")
     runtime project(":samza-yarn_$scalaVersion")
     runtime project(":samza-kafka_$scalaVersion")
+    runtime project(":samza-hdfs_$scalaVersion")
     compile "org.scala-lang:scala-library:$scalaLibVersion"
     compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"
     compile "javax.mail:mail:1.4"
diff --git a/docs/learn/documentation/versioned/hdfs/producer.md b/docs/learn/documentation/versioned/hdfs/producer.md
new file mode 100644 (file)
index 0000000..cfd22c6
--- /dev/null
@@ -0,0 +1,65 @@
+---
+layout: page
+title: Isolation
+---
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+### Writing to HDFS from Samza
+
+The `samza-hdfs` module implements a Samza Producer to write to HDFS. The current implementation includes a ready-to-use `HdfsSystemProducer`, and two `HdfsWriter`s: One that writes messages of raw bytes to a `SequenceFile` of `BytesWritable` keys and values. The other writes UTF-8 `String`s to a `SequenceFile` with `LongWritable` keys and `Text` values.
+
+### Configuring an HdfsSystemProducer
+
+You can configure an HdfsSystemProducer like any other Samza system: using configuration keys and values set in a `job.properties` file.
+You might configure the system producer for use by your `StreamTasks` like this:
+
+```
+# set the SystemFactory implementation to instantiate HdfsSystemProducer aliased to 'hdfs-clickstream'
+systems.hdfs-clickstream.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
+
+# define a serializer/deserializer for the hdfs-clickstream system
+systems.hdfs-clickstream.samza.msg.serde=some-serde-impl
+
+# consumer configs not needed for HDFS system, reader is not implemented yet 
+
+# Assign a Metrics implementation via a label we defined earlier in the props file
+systems.hdfs-clickstream.streams.metrics.samza.msg.serde=some-metrics-impl
+
+# Assign the implementation class for this system's HdfsWriter
+systems.hdfs-clickstream.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter
+
+# Set HDFS SequenceFile compression type. Only BLOCK compression is supported currently
+systems.hdfs-clickstream.producer.hdfs.compression.type=snappy
+
+# The base dir for HDFS output. The default Bucketer for SequenceFile HdfsWriters
+# is currently /BASE/JOB_NAME/DATE_PATH/FILES, where BASE is set below
+systems.hdfs-clickstream.producer.hdfs.base.output.dir=/user/me/analytics/clickstream_data
+
+# Assign the implementation class for the HdfsWriter's Bucketer
+systems.hdfs-clickstream.producer.hdfs.bucketer.class=org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer
+
+# Configure the DATE_PATH the Bucketer will set to bucket output files by day for this job run.
+systems.hdfs-clickstream.producer.hdfs.bucketer.date.path.format=yyyy_MM_dd
+
+# Optionally set the max output bytes per file. A new file will be cut and output
+# continued on the next write call each time this many bytes are written.
+systems.hdfs-clickstream.producer.hdfs.write.batch.size.bytes=134217728
+```
+
+The above configuration assumes a Metrics and Serde implemnetation has been properly configured against the `some-serde-impl` and `some-metrics-impl` labels somewhere else in the same `job.properties` file. Each of these properties has a reasonable default, so you can leave out the ones you don't need to customize for your job run.
+
index 85209bd..dec5be1 100644 (file)
@@ -80,6 +80,7 @@ title: Documentation
 <ul class="documentation-list">
   <li><a href="yarn/application-master.html">Application Master</a></li>
   <li><a href="yarn/isolation.html">Isolation</a></li>
+  <li><a href="hdfs/producer.html">Writing to HDFS</a></li>
 <!-- TODO write yarn pages
   <li><a href="">Fault Tolerance</a></li>
   <li><a href="">Security</a></li>
index 926fd50..8177fe5 100644 (file)
                         60 seconds.
                     </td>
                 </tr>
+
+                <tr>
+                    <th colspan="3" class="section" id="hdfs-system-producer"><a href="../hdfs/producer.html">Writing to HDFS</a></th>
+                </tr>
+
+                <tr>
+                    <td class="property" id="hdfs-writer-class">systems.*.producer.hdfs.writer.class</td>
+                    <td class="default">org.apache.samza.system.hdfs.writer.BinarySequenceFileHdfsWriter</td>
+                    <td class="description">Fully-qualified class name of the HdfsWriter implementation this HDFS Producer system should use</td>
+                </tr>
+                <tr>
+                  <td class="property" id="hdfs-compression-type">systems.*.producer.hdfs.compression.type</td>
+                    <td class="default">none</td>
+                    <td class="description">A human-readable label for the compression type to use, such as "gzip" "snappy" etc. This label will be interpreted differently (or ignored) depending on the nature of the HdfsWriter implementation.</td>
+                </tr>
+                <tr>
+                    <td class="property" id="hdfs-base-output-dir">systems.*.producer.hdfs.base.output.dir</td>
+                    <td class="default">/user/USERNAME/SYSTEMNAME</td>
+                    <td class="description">The base output directory for HDFS writes. Defaults to the home directory of the user who ran the job, followed by the systemName for this HdfsSystemProducer as defined in the job.properties file.</td>
+                </tr>
+                <tr>
+                    <td class="property" id="hdfs-bucketer-class">systems.*.producer.hdfs.bucketer.class</td>
+                    <td class="default">org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer</td>
+                    <td class="description">Fully-qualified class name of the Bucketer implementation that will manage HDFS paths and file names. Used to batch writes by time, or other similar partitioning methods.</td>
+                </tr>
+                <tr>
+                    <td class="property" id="hdfs-bucketer-date-path-format">systems.*.producer.hdfs.bucketer.date.path.format</td>
+                    <td class="default"yyyy_MM_dd></td>
+                    <td class="description">A date format (using Java's SimpleDataFormat syntax) appropriate for use in an HDFS Path, which can configure time-based bucketing of output files.</td>
+                </tr>
+                <tr>
+                  <td class="property" id="hdfs-write-batch-size-bytes">systems.*.producer.write.batch.size.bytes</td>
+                  <td class="default">268435456</td>
+                  <td class="description">The number of bytes of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 256MB if not set.</td>
+                </tr>
             </tbody>
         </table>
     </body>
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
new file mode 100644 (file)
index 0000000..7993119
--- /dev/null
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.hdfs
+
+
+import java.text.SimpleDateFormat
+import java.util.UUID
+
+import org.apache.samza.SamzaException
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.{Config, ScalaMapConfig}
+import org.apache.samza.util.{Logging, Util}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+
+object HdfsConfig {
+
+  // date format string for date-pathed output
+  val DATE_PATH_FORMAT_STRING = "systems.%s.producer.hdfs.bucketer.date.path.format"
+  val DATE_PATH_FORMAT_STRING_DEFAULT = "yyyy_MM_dd-HH"
+
+  // HDFS output base dir
+  val BASE_OUTPUT_DIR = "systems.%s.producer.hdfs.base.output.dir"
+  val BASE_OUTPUT_DIR_DEFAULT = "/user/%s/%s"
+
+  // how much data to write before splitting off a new partfile
+  val WRITE_BATCH_SIZE = "systems.%s.producer.hdfs.write.batch.size.bytes"
+  val WRITE_BATCH_SIZE_DEFAULT = (1024L * 1024L * 256L).toString
+
+  // human-readable compression type name to be interpreted/handled by the HdfsWriter impl
+  val COMPRESSION_TYPE = "systems.%s.producer.hdfs.compression.type"
+  val COMPRESSION_TYPE_DEFAULT = "none"
+
+  // fully qualified class name of the HdfsWriter impl for the named Producer system
+  val HDFS_WRITER_CLASS_NAME ="systems.%s.producer.hdfs.writer.class"
+  val HDFS_WRITER_CLASS_NAME_DEFAULT = "org.apache.samza.system.hdfs.writer.BinarySequenceFileHdfsWriter"
+
+  // fully qualified class name of the Bucketer impl the HdfsWriter should use to generate HDFS paths and filenames
+  val BUCKETER_CLASS = "systems.%s.producer.hdfs.bucketer.class"
+  val BUCKETER_CLASS_DEFAULT = "org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer"
+
+  implicit def Hdfs2Kafka(config: Config) = new HdfsConfig(config)
+
+}
+
+
+class HdfsConfig(config: Config) extends ScalaMapConfig(config) {
+
+  /**
+   * @return the fully-qualified class name of the HdfsWriter subclass that will write for this system.
+   */
+  def getHdfsWriterClassName(systemName: String): String = {
+    getOrElse(HdfsConfig.HDFS_WRITER_CLASS_NAME format systemName, HdfsConfig.HDFS_WRITER_CLASS_NAME_DEFAULT)
+  }
+
+  /**
+   * The base output directory into which all HDFS output for this job will be written.
+   */
+  def getBaseOutputDir(systemName: String): String = {
+    getOrElse(HdfsConfig.BASE_OUTPUT_DIR format systemName,
+      HdfsConfig.BASE_OUTPUT_DIR_DEFAULT  format (System.getProperty("user.name"), systemName))
+  }
+
+  /**
+   * The Bucketer subclass to instantiate for the job run.
+   */
+  def getHdfsBucketerClassName(systemName: String): String = {
+    getOrElse(HdfsConfig.BUCKETER_CLASS format systemName, HdfsConfig.BUCKETER_CLASS_DEFAULT)
+  }
+
+  /**
+   * In an HdfsWriter implementation that peforms time-based output bucketing,
+   * the user may configure a date format (suitable for inclusion in a file path)
+   * using <code>SimpleDateFormat</code> formatting that the Bucketer implementation will
+   * use to generate HDFS paths and filenames. The more granular this date format, the more
+   * often a bucketing HdfsWriter will begin a new date-path bucket when creating the next output file.
+   */
+  def getDatePathFormatter(systemName: String): SimpleDateFormat = {
+    new SimpleDateFormat(getOrElse(HdfsConfig.DATE_PATH_FORMAT_STRING format systemName, HdfsConfig.DATE_PATH_FORMAT_STRING_DEFAULT))
+  }
+
+  def getFileUniqifier(systemName: String): String = {
+    systemName + "-" + UUID.randomUUID + "-"
+  }
+
+  /**
+   * Split output files from all writer tasks based on # of bytes written to optimize
+   * MapReduce utilization for Hadoop jobs that will process the data later.
+   */
+  def getWriteBatchSizeBytes(systemName: String): Long = {
+    getOrElse(HdfsConfig.WRITE_BATCH_SIZE format systemName, HdfsConfig.WRITE_BATCH_SIZE_DEFAULT).toLong
+  }
+
+  /**
+   * Simple, human-readable label for various compression options. HdfsWriter implementations
+   * can choose how to handle these individually, or throw an exception. Example: "none", "gzip", ...
+   */
+  def getCompressionType(systemName: String): String = {
+    getOrElse(HdfsConfig.COMPRESSION_TYPE format systemName, HdfsConfig.COMPRESSION_TYPE_DEFAULT)
+  }
+
+}
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala
new file mode 100644 (file)
index 0000000..f6d53e0
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.hdfs
+
+
+import org.apache.samza.util.Logging
+import org.apache.samza.system.{SystemAdmin, SystemStreamMetadata, SystemStreamPartition}
+
+
+class HdfsSystemAdmin extends SystemAdmin with Logging {
+
+  def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = {
+    new java.util.HashMap[SystemStreamPartition, String]()
+  }
+
+  def getSystemStreamMetadata(streamNames: java.util.Set[String]) = {
+    new java.util.HashMap[String, SystemStreamMetadata]()
+  }
+
+  def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
+    throw new UnsupportedOperationException("Method not implemented.")
+  }
+
+  def createCoordinatorStream(streamName: String) {
+    throw new UnsupportedOperationException("Method not implemented.")
+  }
+
+}
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
new file mode 100644 (file)
index 0000000..ef3c20a
--- /dev/null
@@ -0,0 +1,45 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.hdfs
+
+
+import org.apache.samza.SamzaException
+
+import org.apache.samza.config.Config
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.util.{KafkaUtil,Logging}
+
+
+class HdfsSystemFactory extends SystemFactory with Logging {
+  def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = {
+    throw new SamzaException("HdfsSystemFactory does not implement a consumer")
+  }
+
+  def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = {
+    val clientId = KafkaUtil.getClientId("samza-producer", config)
+    val metrics = new HdfsSystemProducerMetrics(systemName, registry)
+    new HdfsSystemProducer(systemName, clientId, config, metrics)
+  }
+
+  def getAdmin(systemName: String, config: Config) = {
+    new HdfsSystemAdmin
+  }
+}
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
new file mode 100644 (file)
index 0000000..1f4b5c4
--- /dev/null
@@ -0,0 +1,94 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.hdfs
+
+
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.conf.Configuration
+import org.apache.samza.SamzaException
+import org.apache.samza.config.Config
+import org.apache.samza.system.hdfs.HdfsConfig._
+import org.apache.samza.system.{SystemProducer, OutgoingMessageEnvelope}
+import org.apache.samza.system.hdfs.writer.HdfsWriter
+import org.apache.samza.util.{Logging, ExponentialSleepStrategy, TimerUtils, KafkaUtil}
+import scala.collection.mutable.{Map => MMap}
+
+
+class HdfsSystemProducer(
+  systemName: String, clientId: String, config: HdfsConfig, metrics: HdfsSystemProducerMetrics,
+  val clock: () => Long = () => System.currentTimeMillis) extends SystemProducer with Logging with TimerUtils {
+  val dfs = FileSystem.get(new Configuration(true))
+  val writers: MMap[String, HdfsWriter[_]] = MMap.empty[String, HdfsWriter[_]]
+
+  def start(): Unit = {
+    info("entering HdfsSystemProducer.start() call for system: " + systemName + ", client: " + clientId)
+  }
+
+  def stop(): Unit = {
+    info("entering HdfsSystemProducer.stop() for system: " + systemName + ", client: " + clientId)
+    writers.values.map { _.close }
+    dfs.close
+  }
+
+  def register(source: String): Unit = {
+    info("entering HdfsSystemProducer.register(" + source + ") " +
+      "call for system: " + systemName + ", client: " + clientId)
+    writers += (source -> HdfsWriter.getInstance(dfs, systemName, config))
+  }
+
+  def flush(source: String): Unit = {
+    debug("entering HdfsSystemProducer.flush(" + source + ") " +
+      "call for system: " + systemName + ", client: " + clientId)
+    try {
+      metrics.flushes.inc
+      updateTimer(metrics.flushMs) { writers.get(source).head.flush }
+      metrics.flushSuccess.inc
+    } catch {
+      case e: Exception => {
+        metrics.flushFailed.inc
+        warn("Exception thrown while client " + clientId + " flushed HDFS out stream, msg: " + e.getMessage)
+        debug("Detailed message from exception thrown by client " + clientId + " in HDFS flush: ", e)
+        writers.get(source).head.close
+        throw e
+      }
+    }
+  }
+
+  def send(source: String, ome: OutgoingMessageEnvelope) = {
+    debug("entering HdfsSystemProducer.send(source = " + source + ", envelope) " +
+      "call for system: " + systemName + ", client: " + clientId)
+    metrics.sends.inc
+    try {
+      updateTimer(metrics.sendMs) {
+        writers.get(source).head.write(ome)
+      }
+      metrics.sendSuccess.inc
+    } catch {
+      case e: Exception => {
+        metrics.sendFailed.inc
+        warn("Exception thrown while client " + clientId + " wrote to HDFS, msg: " + e.getMessage)
+        debug("Detailed message from exception thrown by client " + clientId + " in HDFS write: ", e)
+        writers.get(source).head.close
+        throw e
+      }
+    }
+  }
+
+}
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducerMetrics.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducerMetrics.scala
new file mode 100644 (file)
index 0000000..10b2226
--- /dev/null
@@ -0,0 +1,42 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.hdfs
+
+
+import org.apache.samza.metrics.{MetricsRegistry, MetricsHelper, Gauge, MetricsRegistryMap}
+
+
+class HdfsSystemProducerMetrics(val systemName: String = "unknown", val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+
+  /* Tracks the number of calls made to send in producer */
+  val sends = newCounter("producer-sends")
+  val sendSuccess = newCounter("send-success")
+  val sendFailed = newCounter("send-failed")
+  val sendMs = newTimer("send-ms")
+
+  /* Tracks the number of calls made to flush in producer */
+  val flushes = newCounter("flushes")
+  val flushFailed = newCounter("flush-failed")
+  val flushSuccess = newCounter("flush-success")
+  val flushMs = newTimer("flush-ms")
+
+  override def getPrefix = systemName + "-"
+
+}
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/BinarySequenceFileHdfsWriter.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/BinarySequenceFileHdfsWriter.scala
new file mode 100644 (file)
index 0000000..ff89bc9
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.hdfs.writer
+
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.{SequenceFile, Writable, BytesWritable}
+
+import org.apache.samza.system.hdfs.HdfsConfig
+import org.apache.samza.system.hdfs.HdfsConfig._
+import org.apache.samza.system.OutgoingMessageEnvelope
+
+
+/**
+ * Implentation of HdfsWriter for SequenceFiles using BytesWritable keys and values. The key
+ * type is currently just a dummy record. This class is usable when the outgoing message
+ * can be converted directly to an Array[Byte] at write time.
+ */
+class BinarySequenceFileHdfsWriter(dfs: FileSystem, systemName: String, config: HdfsConfig) extends SequenceFileHdfsWriter(dfs, systemName, config) {
+  private lazy val defaultBytesWritableKey = new BytesWritable(Array.empty[Byte])
+
+  def getKey = defaultBytesWritableKey
+
+  def getValue(outgoing: OutgoingMessageEnvelope) = {
+    new BytesWritable(outgoing.getMessage.asInstanceOf[Array[Byte]])
+  }
+
+  def getOutputSizeInBytes(writable: Writable) = {
+    writable.asInstanceOf[BytesWritable].getLength
+  }
+
+  def keyClass = classOf[BytesWritable]
+
+  def valueClass = classOf[BytesWritable]
+
+}
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/Bucketer.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/Bucketer.scala
new file mode 100644 (file)
index 0000000..2b153a1
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.hdfs.writer
+
+
+import org.apache.samza.system.hdfs.HdfsConfig
+import org.apache.samza.config.Config
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+
+object Bucketer {
+
+  /**
+   * Factory for the Bucketer subclass the user configured in the job properties file.
+   */
+  def getInstance(systemName: String, samzaConfig: HdfsConfig): Bucketer = {
+    Class.forName(samzaConfig.getHdfsBucketerClassName(systemName))
+      .getConstructor(classOf[String], classOf[HdfsConfig])
+      .newInstance(systemName, samzaConfig)
+      .asInstanceOf[Bucketer]
+  }
+
+}
+
+
+/**
+ * Utility for plugging in various methods of bucketing. Used by HdfsWriters
+ * when a file is completed and a new file is created. Includes path and filename.
+ */
+trait Bucketer {
+
+  /**
+   * Has an event occured that requires us to change the current
+   * HDFS path for output files. Could be time passing in a date bucket, etc.
+   */
+  def shouldChangeBucket: Boolean
+
+  /**
+   * Given the current FileSystem, generate a new HDFS write path and file name.
+   */
+  def getNextWritePath(dfs: FileSystem): Path
+
+}
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/HdfsWriter.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/HdfsWriter.scala
new file mode 100644 (file)
index 0000000..b82018c
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.hdfs.writer
+
+
+import org.apache.hadoop.fs.FileSystem
+
+import org.apache.samza.config.Config
+import org.apache.samza.system.hdfs.HdfsConfig
+import org.apache.samza.system.hdfs.HdfsConfig._
+import org.apache.samza.system.OutgoingMessageEnvelope
+
+
+object HdfsWriter {
+
+  def getInstance(dfs: FileSystem, systemName: String, samzaConfig: HdfsConfig): HdfsWriter[_] = {
+    // instantiate whatever subclass the user configured the system to use
+    Class.forName(samzaConfig.getHdfsWriterClassName(systemName))
+      .getConstructor(classOf[FileSystem], classOf[String], classOf[HdfsConfig])
+      .newInstance(dfs, systemName, samzaConfig)
+      .asInstanceOf[HdfsWriter[_]]
+  }
+
+}
+
+/**
+ * Base for all HdfsWriter implementations.
+ */
+abstract class HdfsWriter[W](dfs: FileSystem, systemName: String, config: HdfsConfig) {
+  protected var writer: Option[W] = None
+
+  def flush: Unit
+
+  def write(buffer: OutgoingMessageEnvelope): Unit
+
+  def close: Unit
+
+}
+
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/JobNameDateTimeBucketer.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/JobNameDateTimeBucketer.scala
new file mode 100644 (file)
index 0000000..b4377a2
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.hdfs.writer
+
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+
+import org.apache.samza.config.Config
+import org.apache.samza.system.hdfs.HdfsConfig
+import org.apache.samza.system.hdfs.HdfsConfig._
+
+
+/**
+ * Configurable path and file name manager for HdfsWriters. Buckets in the format:
+ * /BASE_PATH/JOB_NAME/DATE_FORMAT/FILENAME
+ *
+ * BASE_PATH is configurable in the job properties file, and defaults to: /user/USERNAME/samza-output
+ * JOB_NAME is the job name as configured in the properties file/run args
+ * DATE_FORMAT is configurable in the job properties file, and defaults to: YEAR_MONTH_DAY-HOUR
+ * FILENAME is a unique per-task and per-file name
+ */
+class JobNameDateTimeBucketer(systemName: String, config: HdfsConfig) extends Bucketer {
+  val basePath = config.getBaseOutputDir(systemName)
+  val dateFormatter = config.getDatePathFormatter(systemName)
+  val fileBase = config.getFileUniqifier(systemName)
+
+  var partIndex: Int = 0
+  var currentDateTime = ""
+
+  /**
+   * Test each write to see if we need to cut a new output file based on
+   * time bucketing, regardless of hitting configured file size limits etc.
+   */
+  override def shouldChangeBucket: Boolean = {
+    currentDateTime != dateFormatter.format(new java.util.Date)
+  }
+
+  /**
+   * Given the current FileSystem, returns the next full HDFS path + filename to write to.
+   */
+  override def getNextWritePath(dfs: FileSystem): Path = {
+    val dateTime = dateFormatter.format(new java.util.Date)
+    val base = new Path(Seq(basePath, dateTime).mkString("/"))
+    if (dateTime != currentDateTime) {
+      currentDateTime = dateTime
+      FileSystem.mkdirs(dfs, base, FsPermission.getDirDefault)
+    }
+    new Path(base, nextPartFile)
+  }
+
+  protected def nextPartFile: String = {
+    partIndex += 1
+    if (partIndex > 99999) partIndex = 1
+    fileBase + "%05d".format(partIndex)
+  }
+
+}
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/SequenceFileHdfsWriter.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/SequenceFileHdfsWriter.scala
new file mode 100644 (file)
index 0000000..46ade33
--- /dev/null
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.hdfs.writer
+
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.{SequenceFile, Writable, IOUtils}
+import org.apache.hadoop.io.SequenceFile.Writer
+import org.apache.hadoop.io.compress.{CompressionCodec, DefaultCodec, GzipCodec, SnappyCodec}
+
+import org.apache.samza.system.hdfs.HdfsConfig
+import org.apache.samza.system.hdfs.HdfsConfig._
+import org.apache.samza.system.OutgoingMessageEnvelope
+
+
+abstract class SequenceFileHdfsWriter(dfs: FileSystem, systemName: String, config: HdfsConfig)
+  extends HdfsWriter[SequenceFile.Writer](dfs, systemName, config) {
+
+  val batchSize = config.getWriteBatchSizeBytes(systemName)
+  val bucketer = Some(Bucketer.getInstance(systemName, config))
+
+  var bytesWritten = 0L
+
+  /**
+   * Generate a key (usually a singleton dummy value) appropriate for the SequenceFile you're writing
+   */
+  def getKey: Writable
+
+  /**
+   * Wrap the outgoing message in an appropriate Writable
+   */
+  def getValue(outgoing: OutgoingMessageEnvelope): Writable
+
+  /**
+   * Calculate (or estimate) the byte size of the outgoing message. Used internally
+   * by HdfsWriters to decide when to cut a new output file based on max size.
+   */
+  def getOutputSizeInBytes(writable: Writable): Long
+
+  /**
+   * The Writable key class for the SequenceFile type
+   */
+  def keyClass: Class[_ <: Writable]
+
+  /**
+   * The Writable value class for the SequenceFile type
+   */
+  def valueClass: Class[_ <: Writable]
+
+ /**
+   *  Accepts a human-readable compression type from the job properties file such as
+   *  "gzip", "snappy", or "none" - returns an appropriate SequenceFile CompressionCodec.
+   */
+  def getCompressionCodec(compressionType: String) = {
+    compressionType match {
+      case "snappy" => new SnappyCodec()
+
+      case "gzip"   => new GzipCodec()
+
+      case _        => new DefaultCodec()
+    }
+  }
+
+  override def flush: Unit = writer.map { _.hflush }
+
+  override def write(outgoing: OutgoingMessageEnvelope): Unit = {
+    if (shouldStartNewOutputFile) {
+      close 
+      writer = getNextWriter
+    }
+
+    writer.map { seq =>
+      val writable = getValue(outgoing)
+      bytesWritten += getOutputSizeInBytes(writable)
+      seq.append(getKey, writable)
+    }
+  }
+
+  override def close: Unit = {
+    writer.map { w => w.hflush ; IOUtils.closeStream(w) }
+    writer = None
+    bytesWritten = 0L
+  }
+
+  protected def shouldStartNewOutputFile: Boolean = {
+    bytesWritten >= batchSize || bucketer.get.shouldChangeBucket
+  }
+
+  protected def getNextWriter: Option[SequenceFile.Writer] = {
+    val path = bucketer.get.getNextWritePath(dfs)
+    Some(
+      SequenceFile.createWriter(
+        dfs.getConf,
+        Writer.stream(dfs.create(path)),
+        Writer.keyClass(keyClass),
+        Writer.valueClass(valueClass),
+        Writer.compression(
+          SequenceFile.CompressionType.BLOCK,
+          getCompressionCodec(config.getCompressionType(systemName))
+        )
+      )
+    )
+  }
+
+}
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/TextSequenceFileHdfsWriter.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/TextSequenceFileHdfsWriter.scala
new file mode 100644 (file)
index 0000000..2579bf0
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.hdfs.writer
+
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.{SequenceFile, Writable, LongWritable, Text}
+
+import org.apache.samza.system.hdfs.HdfsConfig
+import org.apache.samza.system.hdfs.HdfsConfig._
+import org.apache.samza.system.OutgoingMessageEnvelope
+
+
+/**
+ * Implentation of HdfsWriter for SequenceFiles using LongWritable keys and BytesWritable values.
+ * The key type is currently just a dummy record. This class is usable when the outgoing message
+ * can be converted directly to a String at write time.
+ */
+class TextSequenceFileHdfsWriter(dfs: FileSystem, systemName: String, config: HdfsConfig) extends SequenceFileHdfsWriter(dfs, systemName, config) {
+  private lazy val defaultLongWritableKey = new LongWritable(0L)
+
+  def getKey = defaultLongWritableKey
+
+  def getValue(outgoing: OutgoingMessageEnvelope) = {
+    new Text(outgoing.getMessage.asInstanceOf[String])
+  }
+
+  def getOutputSizeInBytes(writable: Writable) = {
+    writable.asInstanceOf[Text].getLength
+  }
+
+  def keyClass = classOf[LongWritable]
+
+  def valueClass = classOf[Text]
+
+}
diff --git a/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties b/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties
new file mode 100644 (file)
index 0000000..b590e29
--- /dev/null
@@ -0,0 +1,2 @@
+systems.samza-hdfs-test-batch-job-text.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter
+systems.samza-hdfs-test-batch-job-text.producer.hdfs.write.batch.size.bytes=512
diff --git a/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties b/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties
new file mode 100644 (file)
index 0000000..ab90548
--- /dev/null
@@ -0,0 +1 @@
+systems.samza-hdfs-test-batch-job.producer.hdfs.write.batch.size.bytes=512
diff --git a/samza-hdfs/src/test/resources/samza-hdfs-test-job-text.properties b/samza-hdfs/src/test/resources/samza-hdfs-test-job-text.properties
new file mode 100644 (file)
index 0000000..9df1397
--- /dev/null
@@ -0,0 +1 @@
+systems.samza-hdfs-test-job-text.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter
diff --git a/samza-hdfs/src/test/resources/samza-hdfs-test-job.properties b/samza-hdfs/src/test/resources/samza-hdfs-test-job.properties
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala b/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
new file mode 100644 (file)
index 0000000..c4b04a1
--- /dev/null
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.hdfs
+
+
+import java.io.{File, IOException}
+import java.net.URI
+import java.text.SimpleDateFormat
+import java.util.Date
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
+import org.apache.hadoop.hdfs.{DFSConfigKeys,MiniDFSCluster}
+import org.apache.hadoop.io.{SequenceFile, BytesWritable, LongWritable, Text}
+import org.apache.hadoop.io.SequenceFile.Reader
+
+import org.apache.samza.config.Config
+import org.apache.samza.system.hdfs.HdfsConfig._
+import org.apache.samza.config.factories.PropertiesConfigFactory
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStream}
+import org.apache.samza.util.Logging
+
+import org.junit.{AfterClass, BeforeClass, Test}
+import org.junit.Assert._
+
+import scala.collection.JavaConverters._
+
+
+object TestHdfsSystemProducerTestSuite {
+  val TEST = "test"
+  val BLOCK_SIZE = 128 * 1024 * 1024 // 128MB
+  val BATCH_SIZE = 512 // in bytes, to get multiple file splits in one of the tests
+  val PAUSE = 500 // in millis
+  val JOB_NAME = "samza-hdfs-test-job" // write some data as BytesWritable
+  val BATCH_JOB_NAME = "samza-hdfs-test-batch-job" // write enough binary data to force the producer to split partfiles
+  val TEXT_JOB_NAME = "samza-hdfs-test-job-text" // write some data as String
+  val TEXT_BATCH_JOB_NAME = "samza-hdfs-test-batch-job-text" // force a file split, understanding that Text does some compressing
+  val RESOURCE_PATH_FORMAT = "file://%s/src/test/resources/%s.properties"
+  val TEST_DATE = (new SimpleDateFormat("yyyy_MM_dd-HH")).format(new Date)
+
+  // Test data
+  val EXPECTED = Array[String]("small_data", "medium_data", "large_data")
+  val LUMP = new scala.util.Random().nextString(BATCH_SIZE)
+
+  val hdfsFactory = new TestHdfsSystemFactory()
+  val propsFactory = new PropertiesConfigFactory()
+  val cluster = getMiniCluster
+
+  def testWritePath(job: String): Path = new Path(
+    Seq("/user/", System.getProperty("user.name"), job, TEST_DATE).mkString("/")
+  )
+
+  def getMiniCluster: Option[MiniDFSCluster] = {
+    val conf = new Configuration(false)
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE)
+
+    val cluster = Some(new MiniDFSCluster.Builder(conf).numDataNodes(3).build)
+    cluster.get.waitActive
+
+    cluster
+  }
+
+  def buildProducer(name: String, cluster: MiniDFSCluster): Option[HdfsSystemProducer] @unchecked = {
+   Some(
+      hdfsFactory.getProducer(
+        name,
+        propsFactory.getConfig(URI.create(RESOURCE_PATH_FORMAT format (new File(".").getCanonicalPath, name))),
+        new HdfsSystemProducerMetrics(name),
+        cluster
+      )
+    )
+  }
+
+  def getReader(dfs: FileSystem, path: Path): SequenceFile.Reader = {
+    new SequenceFile.Reader(dfs.getConf, Reader.file(path))
+  }
+
+  @AfterClass
+  def tearDownMiniCluster: Unit = cluster.map{ _.shutdown }
+
+}
+
+
+class TestHdfsSystemProducerTestSuite extends Logging {
+  import org.apache.samza.system.hdfs.TestHdfsSystemProducerTestSuite._
+
+  @Test
+  def testHdfsSystemProducerBinaryWrite {
+    var producer: Option[HdfsSystemProducer] = None
+
+    try {
+      producer = buildProducer(JOB_NAME, cluster.get)
+      producer.get.register(TEST)
+      producer.get.start
+
+      Thread.sleep(PAUSE)
+
+      val systemStream = new SystemStream(JOB_NAME, TEST)
+      EXPECTED.map { _.getBytes("UTF-8") }.map {
+        buffer: Array[Byte] => producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, buffer))
+      }
+
+      producer.get.stop
+      producer = None
+
+      val results = cluster.get.getFileSystem.listStatus(testWritePath(JOB_NAME))
+      val bytesWritten = results.toList.foldLeft(0L) { (acc, status) => acc + status.getLen }
+      assertTrue(results.length == 1)
+      assertTrue(bytesWritten > 0L)
+
+      results.foreach { r =>
+        val reader = getReader(cluster.get.getFileSystem, r.getPath)
+        val key = new BytesWritable
+        val value = new BytesWritable
+        (0 to 2).foreach { i =>
+          reader.next(key, value)
+          assertEquals(EXPECTED(i), new String(value.copyBytes, "UTF-8"))
+        }
+      }
+    } finally {
+      producer.map { _.stop }
+    }
+  }
+
+  @Test
+  def testHdfsSystemProducerWriteBinaryBatches {
+    var producer: Option[HdfsSystemProducer] = None
+
+    try {
+      producer = buildProducer(BATCH_JOB_NAME, cluster.get)
+
+      producer.get.start
+      producer.get.register(TEST)
+      Thread.sleep(PAUSE)
+
+      val systemStream = new SystemStream(BATCH_JOB_NAME, TEST)
+      (1 to 6).map { i => LUMP.getBytes("UTF-8") }.map {
+        buffer => producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, buffer))
+      }
+
+      producer.get.stop
+      producer = None
+
+      val results = cluster.get.getFileSystem.listStatus(testWritePath(BATCH_JOB_NAME))
+
+      assertEquals(6, results.length)
+      results.foreach { r =>
+        val reader = getReader(cluster.get.getFileSystem, r.getPath)
+        val key = new BytesWritable
+        val value = new BytesWritable
+        (1 to BATCH_SIZE).foreach { i =>
+          reader.next(key, value)
+          val data = value.copyBytes
+          assertEquals(LUMP, new String(data, "UTF-8"))
+          assertEquals(LUMP.getBytes("UTF-8").length, data.length)
+        }
+      }
+
+    } finally {
+      producer.map { _.stop }
+    }
+  }
+
+  @Test
+  def testHdfsSystemProducerTextWrite {
+    var producer: Option[HdfsSystemProducer] = None
+
+    try {
+      producer = buildProducer(TEXT_JOB_NAME, cluster.get)
+      producer.get.register(TEST)
+      producer.get.start
+
+      Thread.sleep(PAUSE)
+
+      val systemStream = new SystemStream(TEXT_JOB_NAME, TEST)
+      EXPECTED.map {
+        buffer: String => producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, buffer))
+      }
+
+      producer.get.stop
+      producer = None
+
+      val results = cluster.get.getFileSystem.listStatus(testWritePath(TEXT_JOB_NAME))
+      val bytesWritten = results.toList.foldLeft(0L) { (acc, status) => acc + status.getLen }
+      assertTrue(results.length == 1)
+      assertTrue(bytesWritten > 0L)
+
+      results.foreach { r =>
+        val reader = getReader(cluster.get.getFileSystem, r.getPath)
+        val key = new LongWritable
+        val value = new Text
+        (0 to 2).foreach { i =>
+          reader.next(key, value)
+          assertEquals(EXPECTED(i), value.toString)
+        }
+      }
+    } finally {
+      producer.map { _.stop }
+    }
+  }
+
+  @Test
+  def testHdfsSystemProducerWriteTextBatches {
+    var producer: Option[HdfsSystemProducer] = None
+
+    try {
+      producer = buildProducer(TEXT_BATCH_JOB_NAME, cluster.get)
+
+      producer.get.start
+      producer.get.register(TEST)
+      Thread.sleep(PAUSE)
+
+      val systemStream = new SystemStream(TEXT_BATCH_JOB_NAME, TEST)
+      (1 to 6).map {
+        i => producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, LUMP))
+      }
+
+      producer.get.stop
+      producer = None
+
+      val results = cluster.get.getFileSystem.listStatus(testWritePath(TEXT_BATCH_JOB_NAME))
+
+      assertEquals(6, results.length)
+      results.foreach { r =>
+        val reader = getReader(cluster.get.getFileSystem, r.getPath)
+        val key = new LongWritable
+        val value = new Text
+        (1 to BATCH_SIZE).foreach { i =>
+          reader.next(key, value)
+          val data = value.toString
+          assertEquals(LUMP, data)
+          assertEquals(LUMP.length, data.length)
+        }
+      }
+
+    } finally {
+      producer.map { _.stop }
+    }
+  }
+
+}
+
+
+class TestHdfsSystemProducer(systemName: String, config: HdfsConfig, clientId: String, metrics: HdfsSystemProducerMetrics, mini: MiniDFSCluster)
+  extends HdfsSystemProducer(systemName, clientId, config, metrics) {
+    override val dfs = mini.getFileSystem
+}
+
+
+class TestHdfsSystemFactory extends HdfsSystemFactory {
+    def getProducer(systemName: String, config: Config, metrics: HdfsSystemProducerMetrics, cluster: MiniDFSCluster) = {
+      new TestHdfsSystemProducer(systemName, config, "test", metrics, cluster)
+    }
+}
index 19bff97..a8d2c88 100644 (file)
@@ -28,6 +28,7 @@ def scalaModules = [
         'samza-kv',
         'samza-kv-inmemory',
         'samza-kv-rocksdb',
+        'samza-hdfs',
         'samza-yarn',
         'samza-test'
 ] as HashSet