SAMZA-2101: code cleanup and refactoring 910/head
authorstrkkk <andreypaykin@gmail.com>
Thu, 7 Feb 2019 10:38:35 +0000 (13:38 +0300)
committerstrkkk <andreypaykin@gmail.com>
Thu, 7 Feb 2019 10:38:35 +0000 (13:38 +0300)
samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java
samza-autoscaling/src/test/java/org/apache/samza/autoscaling/utils/YarnUtilTest.java

index d709254..576be6a 100644 (file)
@@ -39,6 +39,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -102,7 +104,7 @@ public class ConfigManager {
     if (config.containsKey(pollingIntervalOpt)) {
       long pollingInterval = config.getLong(pollingIntervalOpt);
       if (pollingInterval <= 0) {
-        throw new IllegalArgumentException("polling interval cannot be a negative value");
+        throw new IllegalArgumentException("polling interval should be greater than 0");
       }
       this.interval = pollingInterval;
     } else {
@@ -119,7 +121,7 @@ public class ConfigManager {
    * Then it reacts accordingly based on the configuration that is being set.
    * The method the calls the start() method to initialized the system, runs in a infinite loop, and calls the stop() method at the end to stop the consumer and the system
    */
-  public void run() {
+  private void run() {
     start();
     try {
       while (true) {
@@ -139,7 +141,7 @@ public class ConfigManager {
   /**
    * Starts the system by starting the consumer
    */
-  public void start() {
+  private void start() {
     register();
     coordinatorStreamConsumer.start();
     coordinatorStreamIterator = coordinatorStreamConsumer.getStartIterator();
@@ -149,7 +151,7 @@ public class ConfigManager {
   /**
    * stops the consumer making the system ready to stop
    */
-  public void stop() {
+  private void stop() {
     coordinatorStreamConsumer.stop();
     coordinatorServerURL = null;
     yarnUtil.stop();
@@ -181,17 +183,15 @@ public class ConfigManager {
    * This method just reads the messages, and it does not react to them or change any configuration of the system.
    */
   private void skipUnreadMessages() {
-    processConfigMessages(new LinkedList<String>());
+    processConfigMessages(Collections.emptyList());
     log.info("Config manager skipped messages");
   }
 
   /**
    * This function reads all the messages with "set-config" type added to the coordinator stream since the last time the method was invoked
    */
-  public void processConfigMessages() {
-    List<String> keysToProcess = new LinkedList<>();
-    keysToProcess.add(YARN_CONTAINER_COUNT_OPT);
-    keysToProcess.add(SERVER_URL_OPT);
+  private void processConfigMessages() {
+    List<String> keysToProcess = Arrays.asList(YARN_CONTAINER_COUNT_OPT, SERVER_URL_OPT);
     processConfigMessages(keysToProcess);
   }
 
@@ -234,7 +234,7 @@ public class ConfigManager {
 
         //TODO: change the handlers to implement a common interface, to make them pluggable
       } catch (Exception e) {
-        log.debug("Error in reading a message, skipping message with key " + key);
+        log.error("Error in reading a message, skipping message with key " + key);
       }
 
     }
@@ -323,7 +323,7 @@ public class ConfigManager {
    *
    * @return current number of tasks in the job
    */
-  public int getCurrentNumTasks() {
+  private int getCurrentNumTasks() {
     int currentNumTasks = 0;
     for (ContainerModel containerModel : SamzaContainer.readJobModel(coordinatorServerURL, defaultReadJobModelDelayMs).getContainers().values()) {
       currentNumTasks += containerModel.getTasks().size();
@@ -337,7 +337,7 @@ public class ConfigManager {
    *
    * @return current number of containers in the job
    */
-  public int getCurrentNumContainers() {
+  private int getCurrentNumContainers() {
     return SamzaContainer.readJobModel(coordinatorServerURL, defaultReadJobModelDelayMs).getContainers().values().size();
   }
 
index cab46b9..7331f61 100644 (file)
@@ -44,12 +44,12 @@ import java.util.Map;
  */
 public class YarnUtil {
   private static final Logger log = LoggerFactory.getLogger(YarnUtil.class);
-  private CloseableHttpClient httpclient;
-  private HttpHost rmServer;
-  private YarnClient yarnClient;
+  private final CloseableHttpClient httpClient;
+  private final HttpHost rmServer;
+  private final YarnClient yarnClient;
 
   public YarnUtil(String rmAddress, int rmPort) {
-    this.httpclient = HttpClientBuilder.create().build();
+    this.httpClient = HttpClientBuilder.create().build();
     this.rmServer = new HttpHost(rmAddress, rmPort, "http");
     log.info("setting rm server to : " + rmServer);
     YarnConfiguration hConfig = new YarnConfiguration();
@@ -70,17 +70,15 @@ public class YarnUtil {
 
     try {
       HttpGet getRequest = new HttpGet("/ws/v1/cluster/apps");
-      HttpResponse httpResponse = httpclient.execute(rmServer, getRequest);
+      HttpResponse httpResponse = httpClient.execute(rmServer, getRequest);
       String applications = EntityUtils.toString(httpResponse.getEntity());
       log.debug("applications: " + applications);
 
       List<Map<String, String>> applicationList = parseYarnApplications(applications);
       String name = jobName + "_" + jobID;
       for (Map<String, String> application : applicationList) {
-        if (application.containsKey("state") && application.containsKey("name") && application.containsKey("id")) {
-          if (application.get("state").toString().equals("RUNNING") && application.get("name").toString().equals(name)) {
-            return application.get("id").toString();
-          }
+        if ("RUNNING".equals(application.get("state")) && name.equals(application.get("name")) && application.containsKey("id")) {
+          return application.get("id");
         }
       }
     } catch (NullPointerException | IOException e) {
@@ -150,7 +148,7 @@ public class YarnUtil {
    */
   public void stop() {
     try {
-      httpclient.close();
+      httpClient.close();
     } catch (IOException e) {
       log.error("HTTP Client failed to close.", e);
     }
index 97ccb2d..7b4b74e 100644 (file)
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information