Block API handle
authorDionysios Logothetis <dionysios@fb.com>
Tue, 10 May 2016 17:41:40 +0000 (10:41 -0700)
committerIgor Kabiljo <ikabiljo@fb.com>
Tue, 10 May 2016 17:41:40 +0000 (10:41 -0700)
Summary:
- Some apps need a reference to the Block API objects (e.g. BlockOutputApi) before they are actually executed. See documentation of `BlockApiHandle` for more details.
- Also, made I `BlockWorkerApi` implement the `BlockOutputApi` as opposed to only the `BlockWorkerReceiveApi` so that output is possible inside the sender. too.

Test Plan:
- `mvn install`
- internal app that uses the api handle from the master and from the workers
- internal snapshot tests

Reviewers: maja.kabiljo, sergey.edunov, ikabiljo

Reviewed By: ikabiljo

Differential Revision: https://reviews.facebook.net/D57939

giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApiHandle.java [new file with mode: 0644]
giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java
giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerReceiveApi.java
giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/BlockWithApiHandle.java [new file with mode: 0644]
giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java
giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerLogic.java
giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerPieces.java
giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockApiHandleTest.java [new file with mode: 0644]

index 6bf6d92..49f23c5 100644 (file)
@@ -52,7 +52,7 @@ public class BlockUtils {
       ClassConfOption.create("digraph.block_factory", null, BlockFactory.class,
           "block factory describing giraph job");
 
-  /** Property describing BlockFactory to use for current application run */
+  /** Property describing block worker context value class to use */
   public static final ClassConfOption<Object> BLOCK_WORKER_CONTEXT_VALUE_CLASS =
       ClassConfOption.create(
           "digraph.block_worker_context_value_class",
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApiHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApiHandle.java
new file mode 100644 (file)
index 0000000..4c52826
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.api;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Class that contains references to Block Api objects.
+ *
+ * One general use-case for this is for applications to indirectly get a handle
+ * on the Block Api objects and  implement operations that (i) depend on the
+ * Block Api interfaces, (ii) are not in the context of a Piece when defined,
+ * and (iii) are in the context of a Piece when executed.
+ *
+ * To do this, as opposed to defining an application as a {@link Block}, define
+ * your application as a {@link BlockWithApiHandle}.
+ *
+ * NOTE: Depending on the context in which this class is used, some of the
+ * handles may not be set. For instance, the {@link masterApi} is not set when
+ * this is in the context of a worker. Trying to get access to a handle when
+ * it is not set will result in a runtime exception. Instead, you should first
+ * use methods like the {@link #isMasterApiSet()} to check.
+ *
+ * The *Api fields are transient as we do not need/want to serialize them. They
+ * will be set at the appropriate time by the framework.
+ */
+public class BlockApiHandle {
+  private transient BlockMasterApi masterApi;
+  private transient BlockWorkerReceiveApi workerReceiveApi;
+  private transient BlockWorkerSendApi workerSendApi;
+  private transient BlockWorkerContextReceiveApi workerContextReceiveApi;
+  private transient BlockWorkerContextSendApi workerContextSendApi;
+
+  public void setMasterApi(BlockMasterApi api) {
+    this.masterApi = api;
+  }
+
+  public void setWorkerReceiveApi(BlockWorkerReceiveApi api) {
+    this.workerReceiveApi = api;
+  }
+
+  public void setWorkerSendApi(BlockWorkerSendApi api) {
+    this.workerSendApi = api;
+  }
+
+  public void setWorkerContextReceiveApi(BlockWorkerContextReceiveApi api) {
+    this.workerContextReceiveApi = api;
+  }
+
+  public void setWorkerContextSendApi(BlockWorkerContextSendApi api) {
+    this.workerContextSendApi = api;
+  }
+
+  public boolean isMasterApiSet() {
+    return masterApi != null;
+  }
+
+  public boolean isWorkerReceiveApiSet() {
+    return workerReceiveApi != null;
+  }
+
+  public boolean isWorkerSendApiSet() {
+    return workerSendApi != null;
+  }
+
+  public boolean isWorkerContextReceiveApiSet() {
+    return workerContextReceiveApi != null;
+  }
+
+  public boolean isWorkerContextSendApiSet() {
+    return workerContextSendApi != null;
+  }
+
+  public BlockMasterApi getMasterApi() {
+    checkNotNull(masterApi,
+      "BlockMasterApi not valid in this context.");
+    return masterApi;
+  }
+
+  public BlockWorkerReceiveApi getWorkerReceiveApi() {
+    checkNotNull(workerReceiveApi,
+      "BlockWorkerReceiveApi not valid in this context.");
+    return workerReceiveApi;
+  }
+
+  public BlockWorkerSendApi getWorkerSendApi() {
+    checkNotNull(workerSendApi,
+      "BlockWorkerSendApi not valid in this context.");
+    return workerSendApi;
+  }
+
+  public BlockWorkerContextReceiveApi getWorkerContextReceiveApi() {
+    checkNotNull(workerContextReceiveApi,
+      "BlockWorkerContextReceiveApi not valid in this context");
+    return workerContextReceiveApi;
+  }
+
+  public BlockWorkerContextSendApi getWorkerContextSendApi() {
+    checkNotNull(workerContextSendApi,
+      "BlockWorkerContextSendApi not valid in this context");
+    return workerContextSendApi;
+  }
+}
index 76898f6..f6b3551 100644 (file)
@@ -30,7 +30,7 @@ import org.apache.hadoop.io.WritableComparable;
  */
 @SuppressWarnings("rawtypes")
 public interface BlockWorkerApi<I extends WritableComparable>
-    extends BlockApi, AggregatorUsage, WorkerIndexUsage<I> {
+    extends BlockApi, BlockOutputApi, AggregatorUsage, WorkerIndexUsage<I> {
   @Override
   ImmutableClassesGiraphConfiguration<I, ?, ?> getConf();
 }
index 6db51bd..b99d319 100644 (file)
@@ -29,5 +29,5 @@ import org.apache.hadoop.io.WritableComparable;
  */
 @SuppressWarnings("rawtypes")
 public interface BlockWorkerReceiveApi<I extends WritableComparable>
-    extends BlockWorkerApi<I>, WorkerBroadcastUsage, BlockOutputApi {
+    extends BlockWorkerApi<I>, WorkerBroadcastUsage {
 }
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/BlockWithApiHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/BlockWithApiHandle.java
new file mode 100644 (file)
index 0000000..f2dad87
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.block;
+
+import org.apache.giraph.block_app.framework.api.BlockApiHandle;
+
+/**
+ * Applications that need access to a {@link BlockApiHandle} should return a
+ * {@link Block} of this type.
+ */
+public interface BlockWithApiHandle extends Block {
+  BlockApiHandle getBlockApiHandle();
+}
index bd86c21..b6167d9 100644 (file)
@@ -26,9 +26,11 @@ import java.util.TreeMap;
 import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.apache.giraph.block_app.framework.BlockFactory;
 import org.apache.giraph.block_app.framework.BlockUtils;
+import org.apache.giraph.block_app.framework.api.BlockApiHandle;
 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
 import org.apache.giraph.block_app.framework.api.BlockOutputHandleAccessor;
 import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.BlockWithApiHandle;
 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.function.Consumer;
@@ -53,6 +55,7 @@ public class BlockMasterLogic<S> {
   private long lastTimestamp = -1;
   private BlockWorkerPieces previousWorkerPieces;
   private boolean computationDone;
+  private BlockApiHandle blockApiHandle;
 
   /** Tracks elapsed time on master for each distinct Piece */
   private final TimeStatsPerEvent masterPerPieceTimeStats =
@@ -82,6 +85,14 @@ public class BlockMasterLogic<S> {
     this.computationDone = false;
 
     LOG.info("Executing application - " + executionBlock);
+    if (executionBlock instanceof BlockWithApiHandle) {
+      blockApiHandle =
+        ((BlockWithApiHandle) executionBlock).getBlockApiHandle();
+    }
+    if (blockApiHandle == null) {
+      blockApiHandle = new BlockApiHandle();
+    }
+    blockApiHandle.setMasterApi(masterApi);
 
     // We register all possible aggregators at the beginning
     executionBlock.forAllPossiblePieces(new Consumer<AbstractPiece>() {
@@ -89,7 +100,7 @@ public class BlockMasterLogic<S> {
       @SuppressWarnings("deprecation")
       @Override
       public void apply(AbstractPiece piece) {
-        // no need to regiser the same piece twice.
+        // no need to register the same piece twice.
         if (registeredPieces.add(piece)) {
           try {
             piece.registerAggregators(masterApi);
@@ -178,7 +189,8 @@ public class BlockMasterLogic<S> {
         postApplication();
         result = null;
       } else {
-        result = new BlockWorkerPieces<>(previousPiece, nextPiece);
+        result = new BlockWorkerPieces<>(
+          previousPiece, nextPiece, blockApiHandle);
         if (logExecutionStatus) {
           LOG.info("Master in " + superstep + " superstep passing " +
               result + " to be executed");
index ca2bb5a..a58f78b 100644 (file)
@@ -65,6 +65,8 @@ public class BlockWorkerContextLogic {
       BlockWorkerContextSendApi sendApi,
       BlockWorkerPieces workerPieces, long superstep,
       List<Writable> messages) {
+    workerPieces.getBlockApiHandle().setWorkerContextReceiveApi(receiveApi);
+    workerPieces.getBlockApiHandle().setWorkerContextSendApi(sendApi);
     if (BlockUtils.LOG_EXECUTION_STATUS.get(receiveApi.getConf())) {
       LOG.info("Worker executing " + workerPieces + " in " + superstep +
           " superstep");
index 844160c..d4f6c3f 100644 (file)
@@ -40,6 +40,8 @@ public class BlockWorkerLogic {
 
   public void preSuperstep(
       BlockWorkerReceiveApi receiveApi, BlockWorkerSendApi sendApi) {
+    pieces.getBlockApiHandle().setWorkerReceiveApi(receiveApi);
+    pieces.getBlockApiHandle().setWorkerSendApi(sendApi);
     if (pieces.getReceiver() != null) {
       receiveFunctions = pieces.getReceiver().getVertexReceiver(receiveApi);
     }
index 3b38cfa..545237d 100644 (file)
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Objects;
 
+import org.apache.giraph.block_app.framework.api.BlockApiHandle;
 import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
 import org.apache.giraph.conf.DefaultMessageClasses;
 import org.apache.giraph.conf.GiraphConstants;
@@ -52,11 +53,14 @@ public class BlockWorkerPieces<S> {
 
   private final PairedPieceAndStage<S> receiver;
   private final PairedPieceAndStage<S> sender;
+  private final BlockApiHandle blockApiHandle;
 
   public BlockWorkerPieces(
-      PairedPieceAndStage<S> receiver, PairedPieceAndStage<S> sender) {
+      PairedPieceAndStage<S> receiver, PairedPieceAndStage<S> sender,
+      BlockApiHandle blockApiHandle) {
     this.receiver = receiver;
     this.sender = sender;
+    this.blockApiHandle = blockApiHandle;
   }
 
   public PairedPieceAndStage<S> getReceiver() {
@@ -67,6 +71,10 @@ public class BlockWorkerPieces<S> {
     return sender;
   }
 
+  public BlockApiHandle getBlockApiHandle() {
+    return blockApiHandle;
+  }
+
   public MessageClasses getOutgoingMessageClasses(
       ImmutableClassesGiraphConfiguration conf) {
     MessageClasses messageClasses;
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockApiHandleTest.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockApiHandleTest.java
new file mode 100644 (file)
index 0000000..328b45d
--- /dev/null
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework;
+
+import org.apache.giraph.block_app.framework.api.BlockApiHandle;
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.BlockWithApiHandle;
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import org.apache.giraph.block_app.framework.piece.DefaultParentPiece;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.function.Consumer;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.TestGraph;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Test the use of {@link BlockApiHandle}.
+ */
+public class BlockApiHandleTest {
+
+  private static GiraphConfiguration createConf() {
+    GiraphConfiguration conf = new GiraphConfiguration();
+    GiraphConstants.VERTEX_ID_CLASS.set(conf, LongWritable.class);
+    GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class);
+    GiraphConstants.EDGE_VALUE_CLASS.set(conf, NullWritable.class);
+    return conf;
+  }
+
+  private static TestGraph<LongWritable, LongWritable, NullWritable>
+  createTestGraph() {
+    TestGraph<LongWritable, LongWritable, NullWritable> graph =
+      new TestGraph<>(createConf());
+    graph.addVertex(new LongWritable(1), new LongWritable());
+    graph.addVertex(new LongWritable(2), new LongWritable());
+    graph.addVertex(new LongWritable(3), new LongWritable());
+    graph.addVertex(new LongWritable(4), new LongWritable());
+    graph.addEdge(new LongWritable(1), new LongWritable(2), NullWritable.get());
+    graph.addEdge(new LongWritable(2), new LongWritable(1), NullWritable.get());
+    graph.addEdge(new LongWritable(2), new LongWritable(3), NullWritable.get());
+    graph.addEdge(new LongWritable(3), new LongWritable(2), NullWritable.get());
+    return graph;
+  }
+
+  public static class DummyObjectWithApiHandle {
+
+    private BlockApiHandle handle;
+
+    public DummyObjectWithApiHandle(BlockApiHandle handle) {
+      this.handle = handle;
+    }
+
+    public void doSomethingAtWorker() {
+      // checking that the handles have been set
+      assertFalse(handle.isMasterApiSet());
+      assertFalse(handle.isWorkerContextReceiveApiSet());
+      assertFalse(handle.isWorkerContextSendApiSet());
+      assertEquals(1, handle.getWorkerReceiveApi().getWorkerCount());
+      assertEquals(0, handle.getWorkerSendApi().getMyWorkerIndex());
+    }
+
+    public void doSomethingAtWorkerContext() {
+      // checking that the handles have been set
+      assertFalse(handle.isMasterApiSet());
+      assertFalse(handle.isWorkerReceiveApiSet());
+      assertFalse(handle.isWorkerSendApiSet());
+      assertEquals(1, handle.getWorkerContextReceiveApi().getWorkerCount());
+      assertEquals(0, handle.getWorkerContextSendApi().getMyWorkerIndex());
+    }
+
+    public void doSomethingAtMaster() {
+      // checking that the handles have been set
+      assertEquals(1, handle.getMasterApi().getWorkerCount());
+      assertFalse(handle.isWorkerReceiveApiSet());
+      assertFalse(handle.isWorkerSendApiSet());
+      assertFalse(handle.isWorkerContextReceiveApiSet());
+      assertFalse(handle.isWorkerContextSendApiSet());
+    }
+  }
+
+  public static class TestPiece extends DefaultParentPiece<WritableComparable,
+      LongWritable, Writable, NullWritable, Object, DoubleWritable, Object> {
+
+    private DummyObjectWithApiHandle object;
+
+    public TestPiece(DummyObjectWithApiHandle object) {
+      this.object = object;
+    }
+
+    @Override
+    public VertexSender<WritableComparable, LongWritable, Writable>
+    getVertexSender(final BlockWorkerSendApi<WritableComparable, LongWritable,
+      Writable, NullWritable> workerApi, Object executionStage) {
+      return new InnerVertexSender() {
+        @Override
+        public void vertexSend(
+          Vertex<WritableComparable, LongWritable, Writable> vertex) {
+          object.doSomethingAtWorker();
+        }
+      };
+    }
+
+    @Override
+    public VertexReceiver<WritableComparable, LongWritable, Writable,
+      NullWritable> getVertexReceiver(
+      BlockWorkerReceiveApi<WritableComparable> workerApi,
+      Object executionStage) {
+      return new InnerVertexReceiver() {
+        @Override
+        public void vertexReceive(
+          Vertex<WritableComparable, LongWritable, Writable> vertex,
+          Iterable<NullWritable> messages) {
+          object.doSomethingAtWorker();
+        }
+      };
+    }
+
+    public void workerContextSend(BlockWorkerContextSendApi<WritableComparable,
+        DoubleWritable> workerContextApi, Object executionStage,
+        Writable workerValue) {
+      object.doSomethingAtWorkerContext();
+    }
+
+    /**
+     * Override to have worker context receive computation.
+     *
+     * Called once per worker, before all vertices are going to be processed
+     * with getVertexReceiver.
+     */
+    public void workerContextReceive(
+      BlockWorkerContextReceiveApi workerContextApi, Object executionStage,
+      Object workerValue, List<DoubleWritable> workerMessages) {
+      object.doSomethingAtWorkerContext();
+    }
+
+    @Override
+    public void masterCompute(BlockMasterApi masterApi, Object executionStage) {
+      object.doSomethingAtMaster();
+    }
+
+    @Override
+    protected Class<NullWritable> getMessageClass() {
+      return NullWritable.class;
+    }
+  }
+
+  @Test
+  public void testBlockApiHandle() {
+    TestGraph<LongWritable, LongWritable, NullWritable> graph =
+      createTestGraph();
+
+    final BlockApiHandle handle = new BlockApiHandle();
+    final DefaultParentPiece piece =
+      new TestPiece(new DummyObjectWithApiHandle(handle));
+
+    Block block = new BlockWithApiHandle() {
+      @Override
+      public Iterator<AbstractPiece> iterator() {
+        return piece.iterator();
+      }
+
+      @Override
+      public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
+        piece.forAllPossiblePieces(consumer);
+      }
+
+      @Override
+      public BlockApiHandle getBlockApiHandle() {
+        return handle;
+      }
+    };
+
+    BlockUtils.BLOCK_WORKER_CONTEXT_VALUE_CLASS.set(
+      graph.getConf(), Object.class);
+    LocalBlockRunner.runBlock(graph, block, new Object());
+  }
+}