GIRAPH-1034 Allow IPs for Worker2Worker communication
authorSergey Edunov <edunov@fb.com>
Tue, 20 Oct 2015 00:35:58 +0000 (17:35 -0700)
committerSergey Edunov <edunov@fb.com>
Tue, 20 Oct 2015 00:35:58 +0000 (17:35 -0700)
Test Plan:
Run several jobs in unreliable DNS environment.  With and without -Dgiraph.preferIP=true
Without this options job fail, but pass otherwise.

Reviewers: dionysis.logothetis, maja.kabiljo

Reviewed By: maja.kabiljo

Differential Revision: https://reviews.facebook.net/D48825

16 files changed:
giraph-core/src/main/java/org/apache/giraph/comm/MasterServer.java
giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
giraph-core/src/main/java/org/apache/giraph/graph/TaskInfo.java
giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java

index de991f8..4a70979 100644 (file)
@@ -32,6 +32,13 @@ public interface MasterServer {
   InetSocketAddress getMyAddress();
 
   /**
+   * Get server host or IP
+   *
+   * @return Server host or IP
+   */
+  String getLocalHostOrIp();
+
+  /**
    * Shuts down.
    */
   void close();
index bed07b9..ab7787d 100644 (file)
@@ -43,6 +43,12 @@ public interface WorkerServer<I extends WritableComparable,
   InetSocketAddress getMyAddress();
 
   /**
+   * Get server host name or IP
+   * @return server host name or IP
+   */
+  String getLocalHostOrIp();
+
+  /**
    * Prepare incoming messages for computation, and resolve mutation requests.
    */
   void prepareSuperstep();
index c85250d..8ea11a5 100644 (file)
@@ -434,7 +434,7 @@ public class NettyClient implements ResetSuperstepMetricsObserver {
           !address.getHostName().equals(taskInfo.getHostname()) ||
           address.getPort() != taskInfo.getPort()) {
         address = resolveAddress(maxResolveAddressAttempts,
-            taskInfo.getInetSocketAddress());
+            taskInfo.getHostOrIp(), taskInfo.getPort());
         taskIdAddressMap.put(taskInfo.getTaskId(), address);
       }
       if (address == null || address.getHostName() == null ||
@@ -930,14 +930,16 @@ public class NettyClient implements ResetSuperstepMetricsObserver {
    *
    * @param maxResolveAddressAttempts Maximum number of attempts to resolve the
    *        address
-   * @param address The address we are attempting to resolve
+   * @param hostOrIp Known IP or host name
+   * @param port Target port number
    * @return The successfully resolved address.
    * @throws IllegalStateException if the address is not resolved
    *         in <code>maxResolveAddressAttempts</code> tries.
    */
   private static InetSocketAddress resolveAddress(
-      int maxResolveAddressAttempts, InetSocketAddress address) {
+      int maxResolveAddressAttempts, String hostOrIp, int port) {
     int resolveAttempts = 0;
+    InetSocketAddress address = new InetSocketAddress(hostOrIp, port);
     while (address.isUnresolved() &&
         resolveAttempts < maxResolveAddressAttempts) {
       ++resolveAttempts;
@@ -949,7 +951,7 @@ public class NettyClient implements ResetSuperstepMetricsObserver {
       } catch (InterruptedException e) {
         LOG.warn("resolveAddress: Interrupted.", e);
       }
-      address = new InetSocketAddress(address.getHostName(),
+      address = new InetSocketAddress(hostOrIp,
           address.getPort());
     }
     if (resolveAttempts >= maxResolveAddressAttempts) {
index 60566f9..37f4f04 100644 (file)
@@ -57,6 +57,11 @@ public class NettyMasterServer implements MasterServer {
   }
 
   @Override
+  public String getLocalHostOrIp() {
+    return nettyServer.getLocalHostOrIp();
+  }
+
+  @Override
   public void close() {
     nettyServer.stop();
   }
index d798a5e..28923b8 100644 (file)
@@ -86,7 +86,7 @@ public class NettyServer {
   private final ChannelGroup accepted = new DefaultChannelGroup(
       ImmediateEventExecutor.INSTANCE);
   /** Local hostname */
-  private final String localHostname;
+  private final String localHostOrIp;
   /** Address of the server */
   private InetSocketAddress myAddress;
   /** Current task info */
@@ -163,7 +163,7 @@ public class NettyServer {
             "netty-server-worker-%d", exceptionHandler));
 
     try {
-      this.localHostname = conf.getLocalHostname();
+      this.localHostOrIp = conf.getLocalHostOrIp();
     } catch (UnknownHostException e) {
       throw new IllegalStateException("NettyServer: unable to get hostname");
     }
@@ -344,7 +344,7 @@ public class NettyServer {
     // Round up the max number of workers to the next power of 10 and use
     // it as a constant to increase the port number with.
     while (bindAttempts < maxIpcPortBindAttempts) {
-      this.myAddress = new InetSocketAddress(localHostname, bindPort);
+      this.myAddress = new InetSocketAddress(localHostOrIp, bindPort);
       if (failFirstPortBindingAttempt && bindAttempts == 0) {
         if (LOG.isInfoEnabled()) {
           LOG.info("start: Intentionally fail first " +
@@ -414,5 +414,9 @@ public class NettyServer {
     return myAddress;
   }
 
+  public String getLocalHostOrIp() {
+    return localHostOrIp;
+  }
+
 }
 
index 600cb1a..c10d49d 100644 (file)
@@ -87,6 +87,11 @@ public class NettyWorkerServer<I extends WritableComparable,
   }
 
   @Override
+  public String getLocalHostOrIp() {
+    return nettyServer.getLocalHostOrIp();
+  }
+
+  @Override
   public void prepareSuperstep() {
     serverData.prepareSuperstep(); // updates the current message-store
   }
index e6931de..6b00645 100644 (file)
@@ -22,6 +22,7 @@ import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.UnpooledByteBufAllocator;
 
+import java.net.InetAddress;
 import java.net.UnknownHostException;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
@@ -1114,6 +1115,19 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Return local host name by default. Or local host IP if preferIP
+   * option is set.
+   * @return local host name or IP
+   * @throws UnknownHostException
+   */
+  public String getLocalHostOrIp() throws UnknownHostException {
+    if (GiraphConstants.PREFER_IP_ADDRESSES.get(this)) {
+      return InetAddress.getLocalHost().getHostAddress();
+    }
+    return getLocalHostname();
+  }
+
+  /**
    * Set the maximum number of supersteps of this application.  After this
    * many supersteps are executed, the application will shutdown.
    *
index 5a0328b..e74703e 100644 (file)
@@ -1212,5 +1212,13 @@ public interface GiraphConstants {
       ClassConfOption.create("giraph.hadoopOutputFormatClass",
           BspOutputFormat.class, OutputFormat.class,
           "Output format class for hadoop to use (for committing)");
+
+  /**
+   * For worker to worker communication we can use IPs or host names, by
+   * default prefer IPs.
+   */
+  BooleanConfOption PREFER_IP_ADDRESSES =
+      new BooleanConfOption("giraph.preferIP", false,
+      "Prefer IP addresses instead of host names");
 }
 // CHECKSTYLE: resume InterfaceIsTypeCheck
index ab3ac64..aecb094 100644 (file)
@@ -35,6 +35,8 @@ public abstract class TaskInfo implements Writable {
   private int port;
   /** Task partition id */
   private int taskId = -1;
+  /** Task host IP */
+  private String hostOrIp;
 
   /**
    * Constructor
@@ -52,6 +54,15 @@ public abstract class TaskInfo implements Writable {
   }
 
   /**
+   * Get this task's host address. Could be IP.
+   *
+   * @return host address
+   */
+  public String getHostOrIp() {
+    return hostOrIp;
+  }
+
+  /**
    * Get port that the IPC server of this task is using
    *
    * @return Port
@@ -64,10 +75,12 @@ public abstract class TaskInfo implements Writable {
    * Set address that the IPC server of this task is using
    *
    * @param address Address
+   * @param host host name or IP
    */
-  public void setInetSocketAddress(InetSocketAddress address) {
+  public void setInetSocketAddress(InetSocketAddress address, String host) {
     this.port = address.getPort();
     this.hostname = address.getHostName();
+    this.hostOrIp = host;
   }
 
   /**
@@ -76,7 +89,7 @@ public abstract class TaskInfo implements Writable {
    * @return InetSocketAddress of the hostname and port.
    */
   public InetSocketAddress getInetSocketAddress() {
-    return new InetSocketAddress(hostname, port);
+    return new InetSocketAddress(hostOrIp, port);
   }
 
   /**
@@ -111,6 +124,7 @@ public abstract class TaskInfo implements Writable {
     if (other instanceof TaskInfo) {
       TaskInfo taskInfo = (TaskInfo) other;
       if (getHostname().equals(taskInfo.getHostname()) &&
+          getHostOrIp().equals(taskInfo.getHostOrIp()) &&
           (getTaskId() == taskInfo.getTaskId()) &&
           (port == taskInfo.getPort() &&
           (taskId == taskInfo.getTaskId()))) {
@@ -123,6 +137,7 @@ public abstract class TaskInfo implements Writable {
   @Override
   public String toString() {
     return "hostname=" + getHostname() +
+        " hostOrIp=" + getHostOrIp() +
         ", MRtaskID=" + getTaskId() +
         ", port=" + getPort();
   }
@@ -130,6 +145,7 @@ public abstract class TaskInfo implements Writable {
   @Override
   public void readFields(DataInput input) throws IOException {
     hostname = input.readUTF();
+    hostOrIp = input.readUTF();
     port = input.readInt();
     taskId = input.readInt();
   }
@@ -137,6 +153,7 @@ public abstract class TaskInfo implements Writable {
   @Override
   public void write(DataOutput output) throws IOException {
     output.writeUTF(hostname);
+    output.writeUTF(hostOrIp);
     output.writeInt(port);
     output.writeInt(taskId);
   }
@@ -146,7 +163,9 @@ public abstract class TaskInfo implements Writable {
     int result = 17;
     result = 37 * result + getPort();
     result = 37 * result + hostname.hashCode();
+    result = 37 * result + hostOrIp.hashCode();
     result = 37 * result + getTaskId();
     return result;
   }
+
 }
index 0e7bb9d..cc70b17 100644 (file)
@@ -847,7 +847,8 @@ public class BspServiceMaster<I extends WritableComparable,
           masterServer =
               new NettyMasterServer(getConfiguration(), this, getContext(),
                   getGraphTaskManager().createUncaughtExceptionHandler());
-          masterInfo.setInetSocketAddress(masterServer.getMyAddress());
+          masterInfo.setInetSocketAddress(masterServer.getMyAddress(),
+              masterServer.getLocalHostOrIp());
           masterInfo.setTaskId(getTaskPartition());
           masterClient =
               new NettyMasterClient(getContext(), getConfiguration(), this,
index 1031bb3..1062479 100644 (file)
@@ -204,7 +204,8 @@ public class BspServiceWorker<I extends WritableComparable,
     workerInfo = new WorkerInfo();
     workerServer = new NettyWorkerServer<I, V, E>(conf, this, context,
         graphTaskManager.createUncaughtExceptionHandler());
-    workerInfo.setInetSocketAddress(workerServer.getMyAddress());
+    workerInfo.setInetSocketAddress(workerServer.getMyAddress(),
+        workerServer.getLocalHostOrIp());
     workerInfo.setTaskId(getTaskPartition());
     workerClient = new NettyWorkerClient<I, V, E>(context, conf, this,
         graphTaskManager.createUncaughtExceptionHandler());
index 5bc9ef0..920935d 100644 (file)
@@ -72,7 +72,7 @@ public class ConnectionTest {
             new WorkerRequestServerHandler.Factory(serverData), workerInfo,
             context, new MockExceptionHandler());
     server.start();
-    workerInfo.setInetSocketAddress(server.getMyAddress());
+    workerInfo.setInetSocketAddress(server.getMyAddress(), server.getLocalHostOrIp());
 
     NettyClient client = new NettyClient(context, conf, new WorkerInfo(),
         new MockExceptionHandler());
@@ -105,7 +105,7 @@ public class ConnectionTest {
         new NettyServer(conf, requestServerHandlerFactory, workerInfo1,
             context, new MockExceptionHandler());
     server1.start();
-    workerInfo1.setInetSocketAddress(server1.getMyAddress());
+    workerInfo1.setInetSocketAddress(server1.getMyAddress(), server1.getLocalHostOrIp());
 
     WorkerInfo workerInfo2 = new WorkerInfo();
     workerInfo1.setTaskId(2);
@@ -113,7 +113,7 @@ public class ConnectionTest {
         new NettyServer(conf, requestServerHandlerFactory, workerInfo2,
             context, new MockExceptionHandler());
     server2.start();
-    workerInfo2.setInetSocketAddress(server2.getMyAddress());
+    workerInfo2.setInetSocketAddress(server2.getMyAddress(), server1.getLocalHostOrIp());
 
     WorkerInfo workerInfo3 = new WorkerInfo();
     workerInfo1.setTaskId(3);
@@ -121,7 +121,7 @@ public class ConnectionTest {
         new NettyServer(conf, requestServerHandlerFactory, workerInfo3,
             context, new MockExceptionHandler());
     server3.start();
-    workerInfo3.setInetSocketAddress(server3.getMyAddress());
+    workerInfo3.setInetSocketAddress(server3.getMyAddress(), server1.getLocalHostOrIp());
 
     NettyClient client = new NettyClient(context, conf, new WorkerInfo(),
         new MockExceptionHandler());
@@ -153,7 +153,7 @@ public class ConnectionTest {
         new WorkerRequestServerHandler.Factory(serverData), workerInfo,
             context, new MockExceptionHandler());
     server.start();
-    workerInfo.setInetSocketAddress(server.getMyAddress());
+    workerInfo.setInetSocketAddress(server.getMyAddress(), server.getLocalHostOrIp());
 
     List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo);
     NettyClient client1 = new NettyClient(context, conf, new WorkerInfo(),
index 0bea783..2785217 100644 (file)
@@ -163,7 +163,7 @@ public class RequestFailureTest {
         new WorkerRequestServerHandler.Factory(serverData), workerInfo,
             context, new MockExceptionHandler());
     server.start();
-    workerInfo.setInetSocketAddress(server.getMyAddress());
+    workerInfo.setInetSocketAddress(server.getMyAddress(), server.getLocalHostOrIp());
     client = new NettyClient(context, conf, new WorkerInfo(),
         new MockExceptionHandler());
     client.connectAllAddresses(
index aa3916c..2688da1 100644 (file)
@@ -98,7 +98,7 @@ public class RequestTest {
             context, new MockExceptionHandler());
     server.start();
 
-    workerInfo.setInetSocketAddress(server.getMyAddress());
+    workerInfo.setInetSocketAddress(server.getMyAddress(), server.getLocalHostOrIp());
     client = new NettyClient(context, conf, new WorkerInfo(),
         new MockExceptionHandler());
     client.connectAllAddresses(
index 96ce062..c63d538 100644 (file)
@@ -88,7 +88,7 @@ public class SaslConnectionTest {
             mockedSaslServerFactory,
             new MockExceptionHandler());
     server.start();
-    workerInfo.setInetSocketAddress(server.getMyAddress());
+    workerInfo.setInetSocketAddress(server.getMyAddress(), server.getLocalHostOrIp());
 
     NettyClient client = new NettyClient(context, conf, new WorkerInfo(),
         new MockExceptionHandler());
index 57bebbd..4bb132b 100644 (file)
@@ -48,7 +48,7 @@ public class SimpleRangePartitionFactoryTest {
     ArrayList<WorkerInfo> infos = new ArrayList<WorkerInfo>();
     for (int i = 0; i < numWorkers; i++) {
       WorkerInfo info = new WorkerInfo();
-      info.setInetSocketAddress(new InetSocketAddress(8080));
+      info.setInetSocketAddress(new InetSocketAddress(8080), "127.0.0.1");
       info.setTaskId(i);
       infos.add(info);
     }