SAMZA-1596: Staging directory name has to be formatted in config
authorAkim Akimov <zedmor@gmail.com>
Thu, 1 Mar 2018 18:58:06 +0000 (10:58 -0800)
committerJagadish <jvenkatraman@linkedin.com>
Thu, 1 Mar 2018 18:58:06 +0000 (10:58 -0800)
When we instantiate a HDFS config staging directory we missing a formatter for getStagingDirectory so systems.hdfs-system-name.stagingDirectory does not parse from config, but only systems.%s.stagingDirectory only parses instead.

Solution is to add formatter to getStagingDirectory method.

Author: Akim Akimov <zedmor@gmail.com>

Reviewers: Jagadish <jagadish@apache.org>, Hai L<halu@linkedin.com>

Closes #431 from Zedmor/FixStagingDirHDFS

samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java

index 9251db0..28a1bac 100644 (file)
@@ -100,7 +100,7 @@ public class HdfsSystemAdmin implements SystemAdmin {
     directoryPartitioner = new DirectoryPartitioner(hdfsConfig.getPartitionerWhiteList(systemName),
       hdfsConfig.getPartitionerBlackList(systemName), hdfsConfig.getPartitionerGroupPattern(systemName),
       new HdfsFileSystemAdapter());
-    stagingDirectory = hdfsConfig.getStagingDirectory();
+    stagingDirectory = hdfsConfig.getStagingDirectory(systemName);
     readerType = HdfsReaderFactory.getType(hdfsConfig.getFileReaderType(systemName));
   }
 
index 230625d..92457ab 100644 (file)
@@ -121,7 +121,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap {
     super(consumerMetrics.getMetricsRegistry());
     hdfsConfig = new HdfsConfig(config);
     readerType = HdfsReaderFactory.getType(hdfsConfig.getFileReaderType(systemName));
-    stagingDirectory = hdfsConfig.getStagingDirectory();
+    stagingDirectory = hdfsConfig.getStagingDirectory(systemName);
     bufferCapacity = hdfsConfig.getConsumerBufferCapacity(systemName);
     numMaxRetires = hdfsConfig.getConsumerNumMaxRetries(systemName);
     readers = new ConcurrentHashMap<>();
index 52e19bf..06bda2a 100644 (file)
@@ -197,7 +197,7 @@ class HdfsConfig(config: Config) extends ScalaMapConfig(config) {
    * Staging directory for storing partition description. If not set, will use the staging directory set
    * by yarn job.
    */
-  def getStagingDirectory(): String = {
-    getOrElse(HdfsConfig.STAGING_DIRECTORY, getOrElse(YarnConfig.YARN_JOB_STAGING_DIRECTORY, HdfsConfig.STAGING_DIRECTORY_DEFAULT))
+  def getStagingDirectory(systemName: String): String = {
+    getOrElse(HdfsConfig.STAGING_DIRECTORY format systemName, getOrElse(YarnConfig.YARN_JOB_STAGING_DIRECTORY, HdfsConfig.STAGING_DIRECTORY_DEFAULT))
   }
 }
index 481988f..6cbf7ba 100644 (file)
@@ -63,7 +63,7 @@ public class TestHdfsSystemConsumer {
     properties.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST(), SYSTEM_NAME), ".*TestHdfsSystemConsumer.*avro");
     Path stagingDirectory = Files.createTempDirectory("staging");
     stagingDirectory.toFile().deleteOnExit();
-    properties.put(HdfsConfig.STAGING_DIRECTORY(), stagingDirectory.toString());
+    properties.put(String.format(HdfsConfig.STAGING_DIRECTORY(), SYSTEM_NAME), stagingDirectory.toString());
     return new MapConfig(properties);
   }