GIRAPH-1054: Separate ThriftService from JobProgressTrackerService on the client
authorAvery Ching <aching@fb.com>
Wed, 13 Apr 2016 23:03:40 +0000 (16:03 -0700)
committerAvery Ching <aching@fb.com>
Fri, 22 Apr 2016 17:57:20 +0000 (10:57 -0700)
Summary:
* Moves the job tracker conf options into the GiraphConstants
* Factors out the static GiraphJob#startThriftServer and GiraphJob#stopThriftServer methods from createJobProgressServer
* Allows adding other Thrift services to the ThriftServer

Test Plan: Tried on a cluster

Reviewers: maja.kabiljo, sergey.edunov

Reviewed By: sergey.edunov

Subscribers: sergey.edunov

Differential Revision: https://reviews.facebook.net/D57087

giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
giraph-core/src/main/java/org/apache/giraph/job/ClientThriftServer.java [new file with mode: 0644]
giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java

index b7f0d5c..1e51101 100644 (file)
@@ -17,9 +17,6 @@
  */
 package org.apache.giraph.conf;
 
-import static java.util.concurrent.TimeUnit.MINUTES;
-import static java.util.concurrent.TimeUnit.SECONDS;
-
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.aggregators.TextAggregatorWriter;
 import org.apache.giraph.bsp.BspOutputFormat;
@@ -83,6 +80,9 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.OutputFormat;
 
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 /**
  * Constants used all over Giraph for configuration.
  */
index 9ce12ed..60cb586 100644 (file)
@@ -19,6 +19,7 @@
 package org.apache.giraph.graph;
 
 import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.job.ClientThriftServer;
 import org.apache.giraph.job.JobProgressTracker;
 import org.apache.giraph.worker.WorkerProgress;
 import org.apache.log4j.Logger;
@@ -78,8 +79,8 @@ public class RetryableJobProgressTrackerClient
         ImmutableSet.<ThriftClientEventHandler>of());
     FramedClientConnector connector =
         new FramedClientConnector(new InetSocketAddress(
-            JOB_PROGRESS_SERVICE_HOST.get(conf),
-            JOB_PROGRESS_SERVICE_PORT.get(conf)));
+            ClientThriftServer.CLIENT_THRIFT_SERVER_HOST.get(conf),
+            ClientThriftServer.CLIENT_THRIFT_SERVER_PORT.get(conf)));
     jobProgressTracker =
         clientManager.createClient(connector, JobProgressTracker.class).get();
 
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/ClientThriftServer.java b/giraph-core/src/main/java/org/apache/giraph/job/ClientThriftServer.java
new file mode 100644 (file)
index 0000000..ce7b5d9
--- /dev/null
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.job;
+
+import com.facebook.swift.codec.ThriftCodecManager;
+import com.facebook.swift.service.ThriftEventHandler;
+import com.facebook.swift.service.ThriftServer;
+import com.facebook.swift.service.ThriftServerConfig;
+import com.facebook.swift.service.ThriftServiceProcessor;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.conf.StrConfOption;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Manages the life cycle of the Thrift server started on the client.
+ */
+public class ClientThriftServer {
+  /**
+   * The client can run a Thrift server (e.g. job progress service).
+   * This is the host of the Thrift server.
+   */
+  public static final StrConfOption CLIENT_THRIFT_SERVER_HOST =
+      new StrConfOption("giraph.client.thrift.server.host", null,
+          "Host on which the client Thrift server runs (if enabled)");
+  /**
+   * The client can run a Thrift server (e.g. job progress service).
+   * This is the port of the Thrift server.
+   */
+  public static final IntConfOption CLIENT_THRIFT_SERVER_PORT =
+      new IntConfOption("giraph.client.thrift.server.port", -1,
+          "Port on which the client Thrift server runs (if enabled)");
+
+  /** Thrift server that is intended to run on the client */
+  private final ThriftServer clientThriftServer;
+
+  /**
+   * Create and start the Thrift server.
+   *
+   * @param conf Giraph conf to set the host and ports for.
+   * @param services Services to start
+   */
+  public ClientThriftServer(GiraphConfiguration conf,
+                            List<?> services) {
+    checkNotNull(conf, "conf is null");
+    checkNotNull(services, "services is null");
+
+    ThriftServiceProcessor processor =
+        new ThriftServiceProcessor(new ThriftCodecManager(),
+                                   new ArrayList<ThriftEventHandler>(),
+                                   services);
+    clientThriftServer =
+        new ThriftServer(processor, new ThriftServerConfig());
+    clientThriftServer.start();
+    try {
+      CLIENT_THRIFT_SERVER_HOST.set(
+          conf,
+          conf.getLocalHostname());
+    } catch (UnknownHostException e) {
+      throw new IllegalStateException("Unable to get host information", e);
+    }
+    CLIENT_THRIFT_SERVER_PORT.set(conf, clientThriftServer.getPort());
+  }
+
+  /**
+   * Stop the Thrift server.
+   */
+  public void stopThriftServer() {
+    this.clientThriftServer.close();
+  }
+}
index 8792e59..90a73c6 100644 (file)
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.job;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.giraph.bsp.BspInputFormat;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
@@ -240,7 +241,13 @@ public class GiraphJob {
       GiraphJobObserver jobObserver = conf.getJobObserver();
 
       JobProgressTrackerService jobProgressTrackerService =
-          JobProgressTrackerService.createJobProgressServer(conf, jobObserver);
+          JobProgressTrackerService.createJobProgressTrackerService(
+              conf, jobObserver);
+      ClientThriftServer clientThriftServer = null;
+      if (jobProgressTrackerService != null) {
+        clientThriftServer = new ClientThriftServer(
+            conf, ImmutableList.of(jobProgressTrackerService));
+      }
 
       tryCount++;
       Job submittedJob = new Job(conf, jobName);
@@ -271,6 +278,10 @@ public class GiraphJob {
       if (jobProgressTrackerService != null) {
         jobProgressTrackerService.stop(passed);
       }
+      if (clientThriftServer != null) {
+        clientThriftServer.stopThriftServer();
+      }
+
       jobObserver.jobFinished(submittedJob, passed);
 
       if (!passed) {
index 4da5450..3041d08 100644 (file)
 
 package org.apache.giraph.job;
 
-import org.apache.giraph.conf.IntConfOption;
-import org.apache.giraph.conf.StrConfOption;
-import org.apache.giraph.worker.WorkerProgress;
-
 import com.facebook.swift.service.ThriftMethod;
 import com.facebook.swift.service.ThriftService;
+import org.apache.giraph.worker.WorkerProgress;
 
 /**
  * Interface for job progress tracker on job client
  */
 @ThriftService
 public interface JobProgressTracker {
-  /** Host on which job progress service runs */
-  StrConfOption JOB_PROGRESS_SERVICE_HOST =
-      new StrConfOption("giraph.jobProgressServiceHost", null,
-          "Host on which job progress service runs");
-  /** Port which job progress service uses */
-  IntConfOption JOB_PROGRESS_SERVICE_PORT =
-      new IntConfOption("giraph.jobProgressServicePort", -1,
-          "Port which job progress service uses");
 
   /** Notify JobProgressTracker that mapper started */
   @ThriftMethod
index b08bf3e..c0189c0 100644 (file)
@@ -24,15 +24,7 @@ import org.apache.giraph.worker.WorkerProgress;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.log4j.Logger;
 
-import com.facebook.swift.codec.ThriftCodecManager;
-import com.facebook.swift.service.ThriftEventHandler;
-import com.facebook.swift.service.ThriftServer;
-import com.facebook.swift.service.ThriftServerConfig;
-import com.facebook.swift.service.ThriftServiceProcessor;
-
 import java.io.IOException;
-import java.net.InetAddress;
-import java.util.ArrayList;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -54,8 +46,6 @@ public class JobProgressTrackerService implements JobProgressTracker {
   private Thread writerThread;
   /** Whether application is finished */
   private volatile boolean finished = false;
-  /** Server which uses this service */
-  private ThriftServer server;
   /** Number of mappers which the job got */
   private int mappersStarted;
   /** Last time number of mappers started was logged */
@@ -208,7 +198,6 @@ public class JobProgressTrackerService implements JobProgressTracker {
   public void stop(boolean succeeded) {
     finished = true;
     writerThread.interrupt();
-    server.close();
     if (LOG.isInfoEnabled()) {
       LOG.info("Job " + (succeeded ? "finished successfully" : "failed") +
           ", cleaning up...");
@@ -216,37 +205,18 @@ public class JobProgressTrackerService implements JobProgressTracker {
   }
 
   /**
-   * Create job progress server on job client, and update configuration with
-   * its hostname and port so mappers would know what to connect to. Returns
-   * null if progress shouldn't be tracked
+   * Create job progress server on job client if enabled in configuration.
    *
    * @param conf Configuration
    * @param jobObserver Giraph job callbacks
    * @return JobProgressTrackerService
    */
-  public static JobProgressTrackerService createJobProgressServer(
+  public static JobProgressTrackerService createJobProgressTrackerService(
       GiraphConfiguration conf, GiraphJobObserver jobObserver) {
     if (!conf.trackJobProgressOnClient()) {
       return null;
     }
-    try {
-      JobProgressTrackerService service =
-          new JobProgressTrackerService(conf, jobObserver);
-      ThriftServiceProcessor processor =
-          new ThriftServiceProcessor(new ThriftCodecManager(),
-              new ArrayList<ThriftEventHandler>(), service);
-      service.server = new ThriftServer(processor, new ThriftServerConfig());
-      service.server.start();
-      JOB_PROGRESS_SERVICE_HOST.set(conf,
-          InetAddress.getLocalHost().getHostName());
-      JOB_PROGRESS_SERVICE_PORT.set(conf, service.server.getPort());
-      return service;
-      // CHECKSTYLE: stop IllegalCatch
-    } catch (Exception e) {
-      // CHECKSTYLE: resume IllegalCatch
-      LOG.warn("Exception occurred while trying to create " +
-          "JobProgressTrackerService - not using progress reporting", e);
-      return null;
-    }
+
+    return new JobProgressTrackerService(conf, jobObserver);
   }
 }