SAMZA-1026: HDFS System Producer should not have Kafka dependency
authorPrateek Maheshwari <pmaheshw@linkedin.com>
Wed, 26 Apr 2017 23:55:25 +0000 (16:55 -0700)
committernramesh <nramesh@linkedin.com>
Wed, 26 Apr 2017 23:55:25 +0000 (16:55 -0700)
Author: Prateek Maheshwari <pmaheshw@linkedin.com>

Reviewers: Navina Ramesh <nramesh@linkedin.com>

Closes #144 from prateekm/hdfs-kafka-dependency

build.gradle
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala

index b6abd47..dc56077 100644 (file)
@@ -512,7 +512,6 @@ project(":samza-hdfs_$scalaVersion") {
   dependencies {
     compile project(':samza-api')
     compile project(":samza-core_$scalaVersion")
-    compile project(":samza-kafka_$scalaVersion")
     // currently hdfs system producer/consumer do depend on yarn for two things:
     // 1. staging directory 2. security
     // SAMZA-1032 to solve the staging directory dependency
index 12c93ae..52e19bf 100644 (file)
@@ -83,7 +83,7 @@ object HdfsConfig {
   val STAGING_DIRECTORY = "systems.%s.stagingDirectory"
   val STAGING_DIRECTORY_DEFAULT = ""
 
-  implicit def Hdfs2Kafka(config: Config) = new HdfsConfig(config)
+  implicit def Config2Hdfs(config: Config) = new HdfsConfig(config)
 
 }
 
index 3673431..05d717a 100644 (file)
 package org.apache.samza.system.hdfs
 
 
-import org.apache.samza.config.Config
+import org.apache.samza.config.{Config, ConfigException, JobConfig}
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.SystemFactory
 import org.apache.samza.system.hdfs.HdfsSystemConsumer.HdfsSystemConsumerMetrics
-import org.apache.samza.util.{KafkaUtil, Logging}
+import org.apache.samza.util.Logging
 
 
 class HdfsSystemFactory extends SystemFactory with Logging {
@@ -33,8 +33,11 @@ class HdfsSystemFactory extends SystemFactory with Logging {
   }
 
   def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = {
-    // TODO: SAMZA-1026: should remove Kafka dependency below
-    val clientId = KafkaUtil.getClientId("samza-producer", config)
+    val jobConfig = new JobConfig(config)
+    val jobName = jobConfig.getName.getOrElse(throw new ConfigException("Missing job name."))
+    val jobId = jobConfig.getJobId.getOrElse("1")
+
+    val clientId = getClientId("samza-producer", jobName, jobId)
     val metrics = new HdfsSystemProducerMetrics(systemName, registry)
     new HdfsSystemProducer(systemName, clientId, config, metrics)
   }
@@ -42,4 +45,11 @@ class HdfsSystemFactory extends SystemFactory with Logging {
   def getAdmin(systemName: String, config: Config) = {
     new HdfsSystemAdmin(systemName, config)
   }
+
+  def getClientId(id: String, jobName: String, jobId: String): String = {
+    "%s-%s-%s" format
+      (id.replaceAll("[^A-Za-z0-9]", "_"),
+        jobName.replaceAll("[^A-Za-z0-9]", "_"),
+        jobId.replaceAll("[^A-Za-z0-9]", "_"))
+  }
 }
index d2967ca..79bca5b 100644 (file)
 package org.apache.samza.system.hdfs
 
 
-import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.conf.Configuration
-import org.apache.samza.SamzaException
-import org.apache.samza.config.Config
-import org.apache.samza.system.hdfs.HdfsConfig._
-import org.apache.samza.system.{SystemProducer, OutgoingMessageEnvelope}
+import org.apache.hadoop.fs.FileSystem
 import org.apache.samza.system.hdfs.writer.HdfsWriter
-import org.apache.samza.util.{Logging, ExponentialSleepStrategy, TimerUtils, KafkaUtil}
+import org.apache.samza.system.{OutgoingMessageEnvelope, SystemProducer}
+import org.apache.samza.util.{Logging, TimerUtils}
+
 import scala.collection.mutable.{Map => MMap}