SAMZA-1738: Merge in some minor additions from Linkedin branch
authorCameron Lee <calee@linkedin.com>
Thu, 28 Jun 2018 16:10:44 +0000 (09:10 -0700)
committerYi Pan (Data Infrastructure) <nickpan47@gmail.com>
Thu, 28 Jun 2018 16:10:44 +0000 (09:10 -0700)
Author: Cameron Lee <calee@linkedin.com>

Reviewers: Yi Pan <nickpan47@gmail.com>

Closes #549 from cameronlee314/sync_li_trunk

15 files changed:
samza-core/src/main/java/org/apache/samza/executors/KeyBasedExecutorService.java [new file with mode: 0644]
samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
samza-core/src/test/java/org/apache/samza/executors/TestKeyBasedExecutorService.java [new file with mode: 0644]
samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java

diff --git a/samza-core/src/main/java/org/apache/samza/executors/KeyBasedExecutorService.java b/samza-core/src/main/java/org/apache/samza/executors/KeyBasedExecutorService.java
new file mode 100644 (file)
index 0000000..a7c19d2
--- /dev/null
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.executors;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * This class supports submitting {@link Runnable} tasks with an ordering key, so that tasks submitted with the
+ * same key will always be executed in order, but tasks across different keys can be executed in parallel and out of
+ * order.
+ * Ordering is achieved by hashing the key objects to threads by their {@link #hashCode()} method.
+ * Ordering is guaranteed only when using the {@link #submitOrdered(Object, Runnable)} method. None of the
+ * {@link #submit} and {@link #execute(Runnable)} method(s) guarantee the ordering semantics.
+ */
+public class KeyBasedExecutorService extends AbstractExecutorService {
+  final String threadPoolNamePrefix;
+  final ExecutorService[] executors;
+  final Random rand = new Random();
+  final int numThreads;
+
+  public KeyBasedExecutorService(int numThreads) {
+    this("KeyBasedExecutor", numThreads);
+  }
+
+  /**
+   * Constructs an instance of a KeyBasedExecutorService that manages the underlying threads
+   *
+   * @param threadPoolNamePrefix String identifier for this ExecutorService. It forms the prefix for each of the
+   *                             underlying thread pool executors
+   * @param numThreads Total number of threads required, mainly dependent on the key set size and the degree of
+   *                   parallelism. Highest level of parallelism can be achieved by setting the
+   *                   number of threads = key set size.
+   * @throws IllegalArgumentException if numThreads {@literal <}= 0
+   */
+  public KeyBasedExecutorService(String threadPoolNamePrefix,
+      int numThreads) {
+    if (numThreads <= 0) {
+      throw new IllegalArgumentException("numThreads has to be greater than 0 in KeyBasedExecutor!");
+    }
+    this.numThreads = numThreads;
+    this.threadPoolNamePrefix = threadPoolNamePrefix;
+    this.executors = new ExecutorService[numThreads];
+    final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
+
+    for (int i = 0; i < numThreads; i++) {
+      final ExecutorService threadPoolExecutorPerQueue = Executors.newSingleThreadExecutor(
+          new ThreadFactoryBuilder()
+              .setThreadFactory(defaultThreadFactory)
+              .setNameFormat(this.threadPoolNamePrefix + "-" + i + "-%d")
+              .build()
+      );
+      executors[i] = threadPoolExecutorPerQueue;
+    }
+  }
+
+  protected ExecutorService chooseRandomExecutor() {
+    if (executors.length == 1) {
+      return executors[0];
+    }
+    return executors[rand.nextInt(executors.length)];
+  }
+
+  protected ExecutorService chooseExecutor(Object object) {
+    if (executors.length == 1) {
+      return executors[0];
+    }
+    return executors[signSafeMod(object.hashCode(), executors.length)];
+  }
+
+  private static int signSafeMod(long dividend, int divisor) {
+    int mod = (int) (dividend % divisor);
+    if (mod < 0) {
+      mod += divisor;
+    }
+    return mod;
+  }
+
+  @Override
+  public void shutdown() {
+    for (int i = 0; i < executors.length; i++) {
+      executors[i].shutdown();
+    }
+  }
+
+  @Override
+  public List<Runnable> shutdownNow() {
+    List<Runnable> unexecutedRunnables = new ArrayList<>();
+    for (int i = 0; i < executors.length; i++) {
+      List<Runnable> unexecutedRunnablesPerQueue = executors[i].shutdownNow();
+      if (unexecutedRunnablesPerQueue != null && unexecutedRunnablesPerQueue.size() > 0) {
+        unexecutedRunnables.addAll(unexecutedRunnablesPerQueue);
+      }
+    }
+    return unexecutedRunnables;
+  }
+
+  @Override
+  public boolean isShutdown() {
+    boolean ret = true;
+    for (int i = 0; i < executors.length; i++) {
+      ret = ret && executors[i].isShutdown();
+    }
+    return ret;
+  }
+
+  @Override
+  public boolean isTerminated() {
+    boolean ret = true;
+    for (int i = 0; i < executors.length; i++) {
+      ret = ret && executors[i].isTerminated();
+    }
+    return ret;
+  }
+
+  /**
+   * Awaits termination of each of the underlying threads
+   *
+   * Note: This can potentially block longer than the given timeout, since the timeout applies for each of the
+   * underlying threads.
+   *
+   * @param timeout time to wait for each thread to terminate
+   * @param unit unit of time for specifying timeout
+   * @return Returns True, if all threads terminate successfully within their timeout. False, otherwise.
+   * @throws InterruptedException thrown when the current executing thread is interrupted
+   */
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+    boolean ret = true;
+    for (int i = 0; i < executors.length; i++) {
+      ret = ret && executors[i].awaitTermination(timeout, unit);
+    }
+    return ret;
+  }
+
+  public Future<?> submitOrdered(Object key, Runnable task) {
+    return chooseExecutor(key).submit(task);
+  }
+
+  /**
+   * Executes the given {@link Runnable} task in a randomly chosen thread-pool
+   * @param command An instance of the {@link Runnable} task
+   */
+  @Override
+  public void execute(Runnable command) {
+    chooseRandomExecutor().execute(command);
+  }
+}
index 206eb8f..ab11785 100644 (file)
@@ -101,10 +101,6 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     case _ => None
   }
 
-  def getLifecycleListeners(): Option[String] = getOption(TaskConfig.LIFECYCLE_LISTENERS)
-
-  def getLifecycleListenerClass(name: String): Option[String] = getOption(TaskConfig.LIFECYCLE_LISTENER format name)
-
   def getTaskClass = getOption(TaskConfig.TASK_CLASS)
 
   def getCommandClass = getOption(TaskConfig.COMMAND_BUILDER)
index a738616..b082a95 100644 (file)
@@ -91,7 +91,10 @@ class RunLoop (
       window
       commit
       val totalNs = clock() - loopStartTime
-      metrics.utilization.set(activeNs.toFloat / totalNs)
+
+      if (totalNs != 0) {
+        metrics.utilization.set(activeNs.toFloat / totalNs)
+      }
       activeNs = 0L
     }
   }
index e537a91..7b7d41f 100644 (file)
@@ -48,6 +48,10 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
   private static SystemConsumer mockConsumer = null;
   private static boolean useCachedConsumer = false;
 
+  public MockCoordinatorStreamSystemFactory() {
+    disableMockConsumerCache();
+  }
+
   public static void enableMockConsumerCache() {
     mockConsumer = null;
     useCachedConsumer = true;
@@ -74,8 +78,8 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
    *               ch:source:taskname -> changelogPartition for changelog
    *               Everything else is processed as normal config
    */
+  @Override
   public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
-
     if (useCachedConsumer && mockConsumer != null) {
       return mockConsumer;
     }
@@ -104,6 +108,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
   /**
    * Returns a MockCoordinatorSystemProducer.
    */
+  @Override
   public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
     return new MockSystemProducer(null);
   }
@@ -124,6 +129,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
    * Returns a single partition admin that pretends to create a coordinator
    * stream.
    */
+  @Override
   public SystemAdmin getAdmin(String systemName, Config config) {
     return new MockSystemAdmin();
   }
diff --git a/samza-core/src/test/java/org/apache/samza/executors/TestKeyBasedExecutorService.java b/samza-core/src/test/java/org/apache/samza/executors/TestKeyBasedExecutorService.java
new file mode 100644 (file)
index 0000000..fbd0f92
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.executors;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class TestKeyBasedExecutorService {
+
+  @Test
+  public void testSubmitOrdered() {
+    KeyBasedExecutorService executorService = new KeyBasedExecutorService("test", 2);
+    ConcurrentLinkedQueue<Integer> resultFromThread0 = new ConcurrentLinkedQueue<>();
+    ConcurrentLinkedQueue<Integer> resultFromThread1 = new ConcurrentLinkedQueue<>();
+
+    final CountDownLatch shutdownLatch = new CountDownLatch(10);
+
+    for (int i = 0; i < 10; i++) {
+      final int currentStep = i;
+      executorService.submitOrdered(currentStep, new Runnable() {
+        @Override
+        public void run() {
+          String threadName = Thread.currentThread().getName();
+          Pattern compiledPattern = Pattern.compile("test-(.+)-0");
+          Matcher matcher = compiledPattern.matcher(threadName);
+          if (matcher.find()) {
+            String threadPoolNumber = matcher.group(1);
+            if ("0".equals(threadPoolNumber)) {
+              resultFromThread0.add(currentStep);
+            } else if ("1".equals(threadPoolNumber)) {
+              resultFromThread1.add(currentStep);
+            }
+            shutdownLatch.countDown();
+          }
+        }
+      });
+    }
+    try {
+      shutdownLatch.await(2, TimeUnit.SECONDS);
+      executorService.shutdown();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    Assert.assertEquals(5, resultFromThread0.size());
+    Assert.assertEquals(5, resultFromThread1.size());
+
+    Iterator<Integer> iterator = resultFromThread0.iterator();
+    int i = 0;
+    while (iterator.hasNext()) {
+      Assert.assertEquals(i, iterator.next().intValue());
+      i += 2;
+    }
+    iterator = resultFromThread1.iterator();
+    i = 1;
+    while (iterator.hasNext()) {
+      Assert.assertEquals(i, iterator.next().intValue());
+      i += 2;
+    }
+
+  }
+}
index e5482a9..423b68a 100644 (file)
@@ -207,6 +207,7 @@ class BrokerProxy(
    * TopicAndPartition.
    */
   def abdicateAll {
+    info("Abdicating all topic partitions.")
     val immutableNextOffsetsCopy = nextOffsets.toMap
     immutableNextOffsetsCopy.keySet.foreach(abdicate(_))
   }
@@ -234,7 +235,10 @@ class BrokerProxy(
       warn("Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format remainingErrors.mkString(","))
       KafkaUtil.maybeThrowException(e.exception) })
 
-    notLeaderOrUnknownTopic.foreach(e => abdicate(e.tp))
+    notLeaderOrUnknownTopic.foreach(e => {
+      warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception %s for %s. Abdicating" format(e.code, e.tp))
+      abdicate(e.tp)
+    })
 
     offsetOutOfRangeErrors.foreach(e => {
       warn("Received OffsetOutOfRange exception for %s. Current offset = %s" format (e.tp, nextOffsets.getOrElse(e.tp, "not found in map, likely removed in the interim")))
@@ -245,7 +249,7 @@ class BrokerProxy(
         nextOffsets.replace(e.tp, newOffset)
       } catch {
         // UnknownTopic or NotLeader are routine events and handled via abdication.  All others, bail.
-        case _ @ (_:UnknownTopicOrPartitionException | _: NotLeaderForPartitionException) => warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception. Abdicating")
+        case _ @ (_:UnknownTopicOrPartitionException | _: NotLeaderForPartitionException) => warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception %s for %s. Abdicating" format(e.code, e.tp))
                                                                                              abdicate(e.tp)
       }
     })
index 4cebb82..fd84c4a 100644 (file)
@@ -33,6 +33,7 @@ import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.IncomingMessageEnvelope
 import kafka.consumer.ConsumerConfig
 import org.apache.samza.util.TopicMetadataStore
+import kafka.api.PartitionMetadata
 import kafka.api.TopicMetadata
 import org.apache.samza.util.ExponentialSleepStrategy
 import java.util.concurrent.ConcurrentHashMap
@@ -167,17 +168,19 @@ private[kafka] class KafkaSystemConsumer(
   }
 
   protected def createBrokerProxy(host: String, port: Int): BrokerProxy = {
+    info("Creating new broker proxy for host: %s and port: %s" format(host, port))
     new BrokerProxy(host, port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter)
   }
 
-  protected def getHostPort(topicMetadata: TopicMetadata, partition: Int): Option[(String, Int)] = {
+  protected def getPartitionMetadata(topicMetadata: TopicMetadata, partition: Int): Option[PartitionMetadata] = {
+    topicMetadata.partitionsMetadata.find(_.partitionId == partition)
+  }
+
+  protected def getLeaderHostPort(partitionMetadata: Option[PartitionMetadata]): Option[(String, Int)] = {
     // Whatever we do, we can't say Broker, even though we're
     // manipulating it here. Broker is a private type and Scala doesn't seem
     // to care about that as long as you don't explicitly declare its type.
-    val brokerOption = topicMetadata
-      .partitionsMetadata
-      .find(_.partitionId == partition)
-      .flatMap(_.leader)
+    val brokerOption = partitionMetadata.flatMap(_.leader)
 
     brokerOption match {
       case Some(broker) => Some(broker.host, broker.port)
@@ -207,8 +210,10 @@ private[kafka] class KafkaSystemConsumer(
             // critical section. If we don't, then notAValidEvent it.
             topicPartitionsAndOffsets.get(head) match {
               case Some(nextOffset) =>
-                getHostPort(topicMetadata(head.topic), head.partition) match {
+                val partitionMetadata = getPartitionMetadata(topicMetadata(head.topic), head.partition)
+                getLeaderHostPort(partitionMetadata) match {
                   case Some((host, port)) =>
+                    debug("Got partition metadata for %s: %s" format(head, partitionMetadata.get))
                     val brokerProxy = brokerProxies.getOrElseUpdate((host, port), createBrokerProxy(host, port))
                     brokerProxy.addTopicPartition(head, Option(nextOffset))
                     brokerProxy.start
index ff945da..51545a0 100644 (file)
@@ -82,10 +82,10 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr
     reads.get(topicAndPartition).inc;
   }
   def incBytesReads(topicAndPartition: TopicAndPartition, inc: Long) {
-    reads.get(topicAndPartition).inc(inc);
+    bytesRead.get(topicAndPartition).inc(inc);
   }
-  def incBrokerBytesReads(host: String, port: Int, inc: Long) {
-    brokerReads.get((host,port)).inc(inc)
+  def incBrokerBytesReads(host: String, port: Int, incBytes: Long) {
+    brokerBytesRead.get((host,port)).inc(incBytes)
   }
   def incBrokerSkippedFetchRequests(host: String, port: Int) {
     brokerSkippedFetchRequests.get((host,port)).inc()
index 0f91622..4cbdc7f 100644 (file)
@@ -34,7 +34,10 @@ class ClientUtilTopicMetadataStore(brokersListString: String, clientId: String,
 
   def getTopicInfo(topics: Set[String]) = {
     val currCorrId = corrID.getAndIncrement
+
+    debug("Fetching topic metadata.")
     val response: TopicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, clientId, timeout, currCorrId)
+    debug("Got topic metadata response: %s" format(response))
 
     if (response.correlationId != currCorrId) {
       throw new SamzaException("CorrelationID did not match for request on topics %s (sent %d, got %d)" format (topics, currCorrId, response.correlationId))
index 5b0137a..601ffa2 100644 (file)
 
 package org.apache.samza.util
 
-import java.util.Properties
 import java.util.concurrent.atomic.AtomicLong
 import kafka.admin.AdminUtils
 import kafka.utils.ZkUtils
 import org.apache.kafka.common.PartitionInfo
-import org.apache.samza.config.ApplicationConfig.ApplicationMode
-import org.apache.samza.config.{ApplicationConfig, Config, ConfigException}
+import org.apache.samza.config.{Config, ConfigException}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.execution.StreamManager
 import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.kafka.common.errors.ReplicaNotAvailableException
 import kafka.common.ErrorMapping
-import org.apache.kafka.common.errors.TopicExistsException
-import org.apache.samza.system.kafka.TopicMetadataCache
 
 object KafkaUtil extends Logging {
   /**
index a533acc..cd511f2 100644 (file)
@@ -134,7 +134,7 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
 
   def createSystemAdmin(coordinatorStreamProperties: Properties, coordinatorStreamReplicationFactor: Int, topicMetaInformation: Map[String, ChangelogInfo]): KafkaSystemAdmin = {
     new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties,
-      coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map())
+      coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map(), false)
   }
 
 }
index 4dd170f..8656d10 100644 (file)
@@ -72,7 +72,7 @@ class TestKafkaSystemConsumer {
     var hosts = List[String]()
     var getHostPortCount = 0
     val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, metadataStore, clientId) {
-      override def getHostPort(topicMetadata: TopicMetadata, partition: Int): Option[(String, Int)] = {
+      override def getLeaderHostPort(partitionMetadata: Option[PartitionMetadata]): Option[(String, Int)] = {
         // Generate a unique host every time getHostPort is called.
         getHostPortCount += 1
         Some("localhost-%s" format getHostPortCount, 0)
index 45b6a39..f7d5823 100644 (file)
@@ -19,7 +19,6 @@
 package org.apache.samza.rest;
 
 import java.util.Collection;
-import java.util.List;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.rest.resources.DefaultResourceFactory;
index 2f940e3..b7e8b5a 100644 (file)
@@ -66,9 +66,9 @@ public class SamzaRestService {
   private final Map<String, MetricsReporter> metricsReporters;
 
   public SamzaRestService(Server server,
-                          ReadableMetricsRegistry metricsRegistry,
-                          Map<String, MetricsReporter> metricsReporters,
-                          ServletContextHandler context) {
+      ReadableMetricsRegistry metricsRegistry,
+      Map<String, MetricsReporter> metricsReporters,
+      ServletContextHandler context) {
     this.server = server;
     this.metricsRegistry = metricsRegistry;
     this.metricsReporters = metricsReporters;
@@ -92,9 +92,10 @@ public class SamzaRestService {
       ReadableMetricsRegistry metricsRegistry = new MetricsRegistryMap();
       log.info("Creating new SamzaRestService with config: {}", config);
       MetricsConfig metricsConfig = new MetricsConfig(config);
-      Map<String, MetricsReporter> metricsReporters = MetricsReporterLoader.getMetricsReporters(metricsConfig, Util.getLocalHost().getHostName());
+      Map<String, MetricsReporter> metricsReporters = MetricsReporterLoader.getMetricsReporters(metricsConfig,
+          Util.getLocalHost().getHostName());
       SamzaRestService restService = new SamzaRestService(new Server(config.getPort()), metricsRegistry, metricsReporters,
-                                                          new ServletContextHandler(ServletContextHandler.SESSIONS));
+          new ServletContextHandler(ServletContextHandler.SESSIONS));
 
       // Add applications
       SamzaRestApplication samzaRestApplication = new SamzaRestApplication(config);
@@ -108,8 +109,8 @@ public class SamzaRestService {
       ScheduledExecutorService schedulingService = Executors.newScheduledThreadPool(1, threadFactory);
       schedulingProvider = new ScheduledExecutorSchedulingProvider(schedulingService);
       SamzaMonitorService monitorService = new SamzaMonitorService(config,
-                                                                   metricsRegistry,
-                                                                   schedulingProvider);
+          metricsRegistry,
+          schedulingProvider);
       monitorService.start();
 
       restService.runBlocking();
index aa4bc3e..466b8cf 100644 (file)
@@ -90,12 +90,6 @@ public class YarnConfig extends MapConfig {
   private static final int DEFAULT_CONTAINER_REQUEST_TIMEOUT_MS = 5000;
 
   /**
-   * Flag to indicate if host-affinity is enabled for the job or not
-   */
-  public static final String HOST_AFFINITY_ENABLED = "yarn.samza.host-affinity.enabled";
-  private static final boolean DEFAULT_HOST_AFFINITY_ENABLED = false;
-
-  /**
    * Principal used to log in on a Kerberized secure cluster
    */
   public static final String YARN_KERBEROS_PRINCIPAL = "yarn.kerberos.principal";
@@ -177,10 +171,6 @@ public class YarnConfig extends MapConfig {
     return getInt(CONTAINER_REQUEST_TIMEOUT_MS, DEFAULT_CONTAINER_REQUEST_TIMEOUT_MS);
   }
 
-  public boolean getHostAffinityEnabled() {
-    return getBoolean(HOST_AFFINITY_ENABLED, DEFAULT_HOST_AFFINITY_ENABLED);
-  }
-
   public String getYarnKerberosPrincipal() {
     return get(YARN_KERBEROS_PRINCIPAL, null);
   }