SAMZA-1618: fix HdfsFileSystemAdapter to get files recursively
authorHai Lu <halu@linkedin.com>
Tue, 13 Mar 2018 19:28:11 +0000 (12:28 -0700)
committerxiliu <xiliu@linkedin.com>
Tue, 13 Mar 2018 19:28:11 +0000 (12:28 -0700)
fix HdfsFileSystemAdapter to get files recursively

Author: Hai Lu <halu@linkedin.com>

Reviewers: Xinyu Liu <xinyuliu.us@gmail.com>

Closes #447 from lhaiesp/master

samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java
samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestHdfsFileSystemAdapter.java
samza-hdfs/src/test/resources/partitioner/subfolder/testfile002 [new file with mode: 0644]

index bb7b3fa..07caaf7 100644 (file)
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +43,11 @@ public class HdfsFileSystemAdapter implements FileSystemAdapter {
       FileSystem fileSystem = streamPath.getFileSystem(new Configuration());
       FileStatus[] fileStatuses = fileSystem.listStatus(streamPath);
       for (FileStatus fileStatus : fileStatuses) {
-        ret.add(new FileMetadata(fileStatus.getPath().toString(), fileStatus.getLen()));
+        if (!fileStatus.isDirectory()) {
+          ret.add(new FileMetadata(fileStatus.getPath().toString(), fileStatus.getLen()));
+        } else {
+          ret.addAll(getAllFiles(fileStatus.getPath().toString()));
+        }
       }
     } catch (IOException e) {
       LOG.error("Failed to get the list of files for " + streamName, e);
index 0fb461f..a20e285 100644 (file)
@@ -38,7 +38,7 @@ public class TestHdfsFileSystemAdapter {
     FileSystemAdapter adapter = new HdfsFileSystemAdapter();
     List<FileSystemAdapter.FileMetadata> result =
       adapter.getAllFiles(url.getPath());
-    Assert.assertEquals(2, result.size());
+    Assert.assertEquals(3, result.size());
   }
 
   @Test
diff --git a/samza-hdfs/src/test/resources/partitioner/subfolder/testfile002 b/samza-hdfs/src/test/resources/partitioner/subfolder/testfile002
new file mode 100644 (file)
index 0000000..fe3e3b6
--- /dev/null
@@ -0,0 +1,16 @@
+censed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.