SAMZA-1189: Fix file system closed issue on hdfs system producer
authorJacob Maes <jmaes@linkedin.com>
Wed, 5 Apr 2017 03:37:32 +0000 (20:37 -0700)
committerJacob Maes <jmaes@linkedin.com>
Wed, 5 Apr 2017 03:37:32 +0000 (20:37 -0700)
Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Jagadish <jvenkatr@linkedin.com>

Closes #111 from jmakes/samza-1189

samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala

index 24bc8b5..d2967ca 100644 (file)
@@ -34,7 +34,7 @@ import scala.collection.mutable.{Map => MMap}
 class HdfsSystemProducer(
   systemName: String, clientId: String, config: HdfsConfig, metrics: HdfsSystemProducerMetrics,
   val clock: () => Long = () => System.currentTimeMillis) extends SystemProducer with Logging with TimerUtils {
-  val dfs = FileSystem.get(new Configuration(true))
+  val dfs = FileSystem.newInstance(new Configuration(true))
   val writers: MMap[String, HdfsWriter[_]] = MMap.empty[String, HdfsWriter[_]]
   private val lock = new Object //synchronization lock for thread safe access