fix HdfsSystemAdmin when staging directory is empty
authorHai Lu <halu@linkedin.com>
Tue, 2 May 2017 19:09:55 +0000 (12:09 -0700)
committerXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Tue, 2 May 2017 19:09:55 +0000 (12:09 -0700)
getSystemStreamMetadata has the potential side effect to persist metadata to a staging directory on hdfs. This could fail if staging directory is empty. This patch addresses the issue with test to cover the scenario.

Author: Hai Lu <halu@linkedin.com>

Reviewers: Xinyu Liu <xinyu@apache.org>

Closes #151 from lhaiesp/master

samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java

index 8bf31c5..f5b05fb 100644 (file)
@@ -21,6 +21,7 @@ package org.apache.samza.system.hdfs;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -116,10 +117,17 @@ public class HdfsSystemAdmin implements SystemAdmin {
   }
 
   static Map<Partition, List<String>> obtainPartitionDescriptorMap(String stagingDirectory, String streamName) {
+    if (StringUtils.isBlank(stagingDirectory)) {
+      LOG.info("Empty or null staging directory: {}", stagingDirectory);
+      return Collections.emptyMap();
+    }
+    if (StringUtils.isBlank(streamName)) {
+      throw new SamzaException(String.format("stream name (%s) is null or empty!", streamName));
+    }
     Path path = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName);
     try (FileSystem fs = path.getFileSystem(new Configuration())) {
       if (!fs.exists(path)) {
-        return null;
+        return Collections.emptyMap();
       }
       try (FSDataInputStream fis = fs.open(path)) {
         String json = IOUtils.toString(fis, StandardCharsets.UTF_8);
@@ -135,6 +143,10 @@ public class HdfsSystemAdmin implements SystemAdmin {
    */
   private void persistPartitionDescriptor(String streamName,
     Map<Partition, List<String>> partitionDescriptorMap) {
+    if (StringUtils.isBlank(stagingDirectory) || StringUtils.isBlank(streamName)) {
+      LOG.warn("Staging directory ({}) or stream name ({}) is empty", stagingDirectory, streamName);
+      return;
+    }
     Path targetPath = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName);
     try (FileSystem fs = targetPath.getFileSystem(new Configuration())) {
       // Partition descriptor is supposed to be immutable. So don't override it if it exists.
@@ -153,6 +165,10 @@ public class HdfsSystemAdmin implements SystemAdmin {
   }
 
   private boolean partitionDescriptorExists(String streamName) {
+    if (StringUtils.isBlank(stagingDirectory) || StringUtils.isBlank(streamName)) {
+      LOG.warn("Staging directory ({}) or stream name ({}) is empty", stagingDirectory, streamName);
+      return false;
+    }
     Path targetPath = PartitionDescriptorUtil.getPartitionDescriptorPath(stagingDirectory, streamName);
     try (FileSystem fs = targetPath.getFileSystem(new Configuration())) {
       return fs.exists(targetPath);
@@ -161,6 +177,17 @@ public class HdfsSystemAdmin implements SystemAdmin {
     }
   }
 
+  /**
+   *
+   * Fetch metadata from hdfs system for a set of streams. This has the potential side effect
+   * to persist partition description to the staging directory on hdfs if staging directory
+   * is not empty. See getStagingDirectory on {@link HdfsConfig}
+   *
+   * @param streamNames
+   *          The streams to to fetch metadata for.
+   * @return A map from stream name to SystemStreamMetadata for each stream
+   *         requested in the parameter set.
+   */
   @Override
   public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
     Map<String, SystemStreamMetadata> systemStreamMetadataMap = new HashMap<>();
index 13a7102..fb9bb56 100644 (file)
@@ -31,6 +31,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.Validate;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
@@ -132,6 +133,12 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap {
         public Map<Partition, List<String>> load(String streamName)
           throws Exception {
           Validate.notEmpty(streamName);
+          if (StringUtils.isBlank(stagingDirectory)) {
+            throw new SamzaException("Staging directory can't be empty. "
+                + "Is this not a yarn job (currently hdfs system consumer only works in "
+                + "the same yarn environment on which hdfs is running)? " + "Is STAGING_DIRECTORY ("
+                + HdfsConfig.STAGING_DIRECTORY() + ") not set (see HdfsConfig.scala)?");
+          }
           return HdfsSystemAdmin.obtainPartitionDescriptorMap(stagingDirectory, streamName);
         }
       });
index 5cad1e4..0661139 100644 (file)
@@ -20,6 +20,7 @@
 package org.apache.samza.system.hdfs.partitioner;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -32,6 +33,8 @@ import java.util.regex.Pattern;
 
 import javax.annotation.Nullable;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
@@ -192,12 +195,12 @@ public class DirectoryPartitioner {
   public Map<Partition, SystemStreamPartitionMetadata> getPartitionMetadataMap(String streamName,
     @Nullable Map<Partition, List<String>> existingPartitionDescriptorMap) {
     LOG.info("Trying to obtain metadata for " + streamName);
-    LOG.info("Existing partition descriptor: " + (existingPartitionDescriptorMap == null ? "empty"
+    LOG.info("Existing partition descriptor: " + (MapUtils.isEmpty(existingPartitionDescriptorMap) ? "empty"
       : existingPartitionDescriptorMap));
     Map<Partition, SystemStreamPartitionMetadata> partitionMetadataMap = new HashMap<>();
     partitionDescriptorMap.putIfAbsent(streamName, new HashMap<>());
     List<FileMetadata> filteredFiles = getFilteredFiles(streamName);
-    if (existingPartitionDescriptorMap != null) {
+    if (!MapUtils.isEmpty(existingPartitionDescriptorMap)) {
       filteredFiles = validateAndGetOriginalFilteredFiles(filteredFiles, existingPartitionDescriptorMap);
     }
     List<List<FileMetadata>> groupedPartitions = generatePartitionGroups(filteredFiles);
index ef5ab00..21afcb9 100644 (file)
@@ -31,6 +31,7 @@ import java.util.Set;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -41,7 +42,7 @@ import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.Assert;
 import org.junit.Test;
 
-
+import com.google.common.util.concurrent.UncheckedExecutionException;
 
 
 public class TestHdfsSystemConsumer {
@@ -133,4 +134,37 @@ public class TestHdfsSystemConsumer {
       Assert.assertEquals(messages.get(NUM_EVENTS).getOffset(), IncomingMessageEnvelope.END_OF_STREAM_OFFSET);
     });
   }
+
+  /*
+   * Ensure that empty staging directory will not break system admin,
+   * but should fail system consumer
+   */
+  @Test
+  public void testEmptyStagingDirectory() throws Exception {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST(), SYSTEM_NAME), ".*avro");
+    Config config = new MapConfig(configMap);
+    HdfsSystemFactory systemFactory = new HdfsSystemFactory();
+
+    // create admin and do partitioning
+    HdfsSystemAdmin systemAdmin = systemFactory.getAdmin(SYSTEM_NAME, config);
+    String stream = WORKING_DIRECTORY;
+    Set<String> streamNames = new HashSet<>();
+    streamNames.add(stream);
+    generateAvroDataFiles();
+    Map<String, SystemStreamMetadata> streamMetadataMap = systemAdmin.getSystemStreamMetadata(streamNames);
+    SystemStreamMetadata systemStreamMetadata = streamMetadataMap.get(stream);
+    Assert.assertEquals(NUM_FILES, systemStreamMetadata.getSystemStreamPartitionMetadata().size());
+
+    // create consumer and read from files
+    HdfsSystemConsumer systemConsumer = systemFactory.getConsumer(SYSTEM_NAME, config, new NoOpMetricsRegistry());
+    Partition partition = new Partition(0);
+    SystemStreamPartition ssp = new SystemStreamPartition(SYSTEM_NAME, stream, partition);
+    try {
+      systemConsumer.register(ssp, "0");
+      Assert.fail("Empty staging directory should fail system consumer");
+    } catch (UncheckedExecutionException e) {
+      Assert.assertTrue(e.getCause() instanceof SamzaException);
+    }
+  }
 }