[GIRAPH 1013] Adding TestGraphUtils and NumericTestGraph
authorIgor Kabiljo <ikabiljo@fb.com>
Tue, 23 Jun 2015 04:44:49 +0000 (21:44 -0700)
committerIgor Kabiljo <ikabiljo@fb.com>
Fri, 26 Jun 2015 00:40:18 +0000 (17:40 -0700)
Summary:
Adding simplified framework for running application tests.
Code for testing is going to be much shorter, especially
when Java 8 is used.

Only difference compared to our codebase is addition of
SendingMessagesTest to showcase these capabilities

Test Plan: mvn clean install

Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo

Reviewed By: maja.kabiljo

Differential Revision: https://reviews.facebook.net/D40533

14 files changed:
giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java [new file with mode: 0644]
giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphChecker.java [new file with mode: 0644]
giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphModifier.java [new file with mode: 0644]
giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphUtils.java [new file with mode: 0644]
giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/EachVertexInit.java [new file with mode: 0644]
giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small1GraphInit.java [new file with mode: 0644]
giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small2GraphInit.java [new file with mode: 0644]
giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java [new file with mode: 0644]
giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/package-info.java [new file with mode: 0644]
giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/package-info.java [new file with mode: 0644]
giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/SendingMessagesTest.java [new file with mode: 0644]
giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestLongNullNullBlockFactory.java [new file with mode: 0644]
giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestWorkerMessages.java [new file with mode: 0644]
giraph-block-app/src/test/java/org/apache/giraph/block_app/reducers/array/ObjectStripingTest.java

diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java
new file mode 100644 (file)
index 0000000..4886c80
--- /dev/null
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.test_setup;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.function.Function;
+import org.apache.giraph.function.Supplier;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.TestGraph;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Wraps TestGraph to allow using numbers to create and inspect the graph,
+ * instead of needing to have actual Writable values, which don't have
+ * auto-boxing.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+public class NumericTestGraph<I extends WritableComparable,
+    V extends Writable,
+    E extends Writable> {
+
+  private static final Logger LOG = Logger.getLogger(NumericTestGraph.class);
+
+  private final TestGraph<I, V, E> testGraph;
+  private final Function<Number, I> numberToVertexId;
+  private final Function<Number, V> numberToVertexValue;
+  private final Function<Number, E> numberToEdgeValue;
+
+  public NumericTestGraph(TestGraph<I, V, E> testGraph) {
+    this.testGraph = testGraph;
+    numberToVertexId =
+        numericConvForType(testGraph.getConf().getVertexIdClass());
+    numberToVertexValue =
+        numericConvForType(testGraph.getConf().getVertexValueClass());
+    numberToEdgeValue =
+        numericConvForType(testGraph.getConf().getEdgeValueClass());
+    Preconditions.checkState(this.numberToVertexId != null);
+  }
+
+  public NumericTestGraph(GiraphConfiguration conf) {
+    this(new TestGraph<I, V, E>(conf));
+  }
+
+  public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
+    return testGraph.getConf();
+  }
+
+  public TestGraph<I, V, E> getTestGraph() {
+    return testGraph;
+  }
+
+
+  /**
+   * Get Vertex for a given id.
+   */
+  public Vertex<I, V, E> getVertex(Number vertexId) {
+    return testGraph.getVertex(numberToVertexId(vertexId));
+  }
+
+  /**
+   * Get Vertex Value for a given id.
+   */
+  public V getValue(Number vertexId) {
+    return testGraph.getVertex(numberToVertexId(vertexId)).getValue();
+  }
+
+  /**
+   * Get number of vertices in the graph
+   */
+  public int getVertexCount() {
+    return testGraph.getVertices().size();
+  }
+
+  /**
+   * Add Vertex with a given id to the graph, initializing it to
+   * default vertex value and no edges.
+   */
+  public void addVertex(Number vertexId) {
+    addVertex(vertexId, (Number) null);
+  }
+
+  /**
+   * Add Vertex with a given id and a given Vertex Value to the graph,
+   * initializing it to have no edges.
+   */
+  public void addVertex(Number vertexId, Number vertexValue) {
+    addVertex(vertexId, vertexValue, null);
+  }
+
+  /**
+   * Add Vertex with a given id and a given Vertex Value to the graph,
+   * with listed outgoing edges, all initialized to same provided
+   * {@code edgeValue}.
+   */
+  public void addVertex(Number vertexId, Number vertexValue,
+      Number edgeValue, Number... outEdges) {
+    Vertex<I, V, E> vertex = makeVertex(
+        vertexId, vertexValue, edgeValue, outEdges);
+    testGraph.addVertex(vertex);
+  }
+
+  /**
+   * Add Vertex with a given id and a given Vertex Value to the graph,
+   * initializing it to have no edges.
+   */
+  public void addVertex(Number vertexId, V vertexValue) {
+    addVertex(vertexId, vertexValue, null);
+  }
+
+  /**
+   * Add Vertex with a given id and a given Vertex Value to the graph,
+   * with listed outgoing edges, all initialized to same provided
+   * {@code edgeSupplier}.
+   */
+  public void addVertex(Number vertexId, V vertexValue,
+      Supplier<E> edgeSupplier, Number... outEdges) {
+    Vertex<I, V, E> vertex = makeVertex(
+        vertexId, vertexValue, edgeSupplier, outEdges);
+    testGraph.addVertex(vertex);
+  }
+
+  /**
+   * Add Edge to the graph with default Edge Value, by adding it to
+   * outEdges of {@code fromVertex}, potentially creating {@code fromVertex}
+   * if it doesn't exist.
+   */
+  public void addEdge(Number fromVertex, Number toVertex) {
+    addEdge(fromVertex, toVertex, (Number) null);
+  }
+
+  /**
+   * Add Edge to the graph with provided Edge Value, by adding it to
+   * outEdges of {@code fromVertex}, potentially creating {@code fromVertex}
+   * if it doesn't exist.
+   */
+  public void addEdge(Number fromVertex, Number toVertex, Number edgeValue) {
+    testGraph.addEdge(
+        numberToVertexId(fromVertex),
+        numberToVertexId(toVertex),
+        numberToEdgeValue(edgeValue));
+  }
+
+  /**
+   * Add Edge to the graph with provided Edge Value, by adding it to
+   * outEdges of {@code fromVertex}, potentially creating {@code fromVertex}
+   * if it doesn't exist.
+   */
+  public void addEdge(Number fromVertex, Number toVertex, E edgeValue) {
+    testGraph.addEdge(
+        numberToVertexId(fromVertex), numberToVertexId(toVertex), edgeValue);
+  }
+
+  /**
+   * Add symmetric Edge to the graph with default Edge Value, by adding it to
+   * outEdges of vertices on both ends, potentially creating them both,
+   * if they don't exist.
+   */
+  public void addSymmetricEdge(Number fromVertex, Number toVertex) {
+    addEdge(fromVertex, toVertex);
+    addEdge(toVertex, fromVertex);
+  }
+
+  /**
+   * Add symmetric Edge to the graph with provided Edge Value, by adding it to
+   * outEdges of vertices on both ends, potentially creating them both,
+   * if they don't exist.
+   */
+  public void addSymmetricEdge(
+      Number fromVertex, Number toVertex, Number edgeValue) {
+    addEdge(fromVertex, toVertex, edgeValue);
+    addEdge(toVertex, fromVertex, edgeValue);
+  }
+
+  /**
+   * Add symmetric Edge to the graph with provided Edge Value, by adding it to
+   * outEdges of vertices on both ends, potentially creating them both,
+   * if they don't exist.
+   */
+  public void addSymmetricEdge(Number vertexId, Number toVertex, E edgeValue) {
+    addEdge(vertexId, toVertex, edgeValue);
+    addEdge(toVertex, vertexId, edgeValue);
+  }
+
+  /**
+   * Creates a new Vertex object, without adding it into the graph.
+   *
+   * This function is safe to call from multiple threads at the same time,
+   * and then synchronize only on actual addition of Vertex to the graph
+   * itself.
+   */
+  public Vertex<I, V, E> makeVertex(
+      Number vertexId, V vertexValue,
+      Entry<? extends Number, ? extends Number>... edges) {
+    Vertex<I, V, E> vertex = getConf().createVertex();
+    List<Edge<I, E>> edgesList = new ArrayList<>();
+
+    int i = 0;
+    for (Entry<? extends Number, ? extends Number> edge: edges) {
+      edgesList.add(EdgeFactory.create(
+        numberToVertexId(edge.getKey()),
+        numberToEdgeValue(edge.getValue())));
+      i++;
+    }
+    vertex.initialize(
+            numberToVertexId(vertexId),
+            vertexValue != null ?
+              vertexValue : getConf().createVertexValue(),
+            edgesList);
+    return vertex;
+  }
+
+  /**
+   * Creates a new Vertex object, without adding it into the graph.
+   *
+   * This function is safe to call from multiple threads at the same time,
+   * and then synchronize only on actual addition of Vertex to the graph
+   * itself.
+   */
+  public Vertex<I, V, E> makeVertex(
+      Number vertexId, V vertexValue,
+      Supplier<E> edgeSupplier, Number... edges) {
+    Vertex<I, V, E> vertex = getConf().createVertex();
+
+    List<Edge<I, E>> edgesList = new ArrayList<>();
+    for (Number edge: edges) {
+      edgesList.add(
+          EdgeFactory.create(numberToVertexId.apply(edge),
+          edgeSupplier != null ?
+            edgeSupplier.get() : getConf().createEdgeValue()));
+    }
+
+    vertex.initialize(
+        numberToVertexId.apply(vertexId),
+        vertexValue != null ?
+          vertexValue : getConf().createVertexValue(),
+        edgesList);
+    return vertex;
+  }
+
+  /**
+   * Creates a new Vertex object, without adding it into the graph.
+   *
+   * This function is safe to call from multiple threads at the same time,
+   * and then synchronize only on actual addition of Vertex to the graph
+   * itself.
+   */
+  public Vertex<I, V, E> makeVertex(
+      Number vertexId, Number value,
+      Number edgeValue, Number... edges) {
+    Vertex<I, V, E> vertex = getConf().createVertex();
+
+    List<Edge<I, E>> edgesList = new ArrayList<>();
+    for (Number edge: edges) {
+      edgesList.add(
+          EdgeFactory.create(numberToVertexId.apply(edge),
+          numberToEdgeValue(edgeValue)));
+    }
+
+    vertex.initialize(
+        numberToVertexId.apply(vertexId),
+        numberToVertexValue(value),
+        edgesList);
+    return vertex;
+  }
+
+  public I numberToVertexId(Number value) {
+    return numberToVertexId.apply(value);
+  }
+
+  public V numberToVertexValue(Number value) {
+    return value != null ?
+      numberToVertexValue.apply(value) : getConf().createVertexValue();
+  }
+
+  public E numberToEdgeValue(Number edgeValue) {
+    return edgeValue != null ?
+      numberToEdgeValue.apply(edgeValue) : getConf().createEdgeValue();
+  }
+
+  public Vertex<I, V, E> createVertex() {
+    return getConf().createVertex();
+  }
+
+  public void initializeVertex(
+          Vertex<I, V, E> v, I id, Supplier<V> valueSupplier,
+          List<Edge<I, E>> edgesList) {
+    v.initialize(
+            id,
+            valueSupplier != null ?
+              valueSupplier.get() : getConf().createVertexValue(),
+            edgesList != null ? edgesList : new ArrayList<Edge<I, E>>());
+  }
+
+  @Override
+  public String toString() {
+    return testGraph.toString();
+  }
+
+
+  private static Function<Number, IntWritable> numberToInt() {
+    return new Function<Number, IntWritable>() {
+      @Override
+      public IntWritable apply(Number input) {
+        return new IntWritable(input.intValue());
+      }
+    };
+  }
+
+  private static Function<Number, LongWritable> numberToLong() {
+    return new Function<Number, LongWritable>() {
+      @Override
+      public LongWritable apply(Number input) {
+        return new LongWritable(input.longValue());
+      }
+    };
+  }
+
+  private static Function<Number, DoubleWritable> numberToDouble() {
+    return new Function<Number, DoubleWritable>() {
+      @Override
+      public DoubleWritable apply(Number input) {
+        return new DoubleWritable(input.doubleValue());
+      }
+    };
+  }
+
+  private static Function<Number, FloatWritable> numberToFloat() {
+    return new Function<Number, FloatWritable>() {
+      @Override
+      public FloatWritable apply(Number input) {
+        return new FloatWritable(input.floatValue());
+      }
+    };
+  }
+
+  private static <T> Function<Number, T> numericConvForType(Class<T> type) {
+    if (type.equals(LongWritable.class)) {
+      return (Function) numberToLong();
+    } else if (type.equals(IntWritable.class)) {
+      return (Function) numberToInt();
+    } else if (type.equals(DoubleWritable.class)) {
+      return (Function) numberToDouble();
+    } else if (type.equals(FloatWritable.class)) {
+      return (Function) numberToFloat();
+    } else {
+      return null;
+    }
+  }
+}
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphChecker.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphChecker.java
new file mode 100644 (file)
index 0000000..c7b2bc5
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.test_setup;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Test graph checker, function that checks whether output
+ * of a test is correct.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+public interface TestGraphChecker<I extends WritableComparable,
+    V extends Writable, E extends Writable> {
+  void checkOutput(NumericTestGraph<I, V, E> graph);
+}
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphModifier.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphModifier.java
new file mode 100644 (file)
index 0000000..1017590
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.test_setup;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Test graph modifier, function that initializes graph
+ * (i.e. creates edges and vertices) for a given test.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+public interface TestGraphModifier<I extends WritableComparable,
+    V extends Writable, E extends Writable> {
+  void modifyGraph(NumericTestGraph<I, V, E> graph);
+}
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphUtils.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphUtils.java
new file mode 100644 (file)
index 0000000..15bf434
--- /dev/null
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.test_setup;
+
+import org.apache.giraph.block_app.framework.BlockUtils;
+import org.apache.giraph.block_app.framework.BulkConfigurator;
+import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
+import org.apache.giraph.conf.BooleanConfOption;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.function.Supplier;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Utility functions for running TestGraph unit tests.
+ */
+public class TestGraphUtils {
+  /** modify locally for running full Digraph tests from IDE */
+  public static final
+  BooleanConfOption USE_FULL_GIRAPH_ENV_IN_TESTS = new BooleanConfOption(
+      "giraph.blocks.test_setup.use_full_giraph_env_in_tests", false,
+      "Whether to use full giraph environemnt for tests, " +
+      "or only local implementation");
+
+  // if you want to check stability of the test and make sure it passes always
+  // test it with larger number, like ~10.
+  private static int TEST_REPEAT_TIMES = 1;
+
+  private TestGraphUtils() { }
+
+  /**
+   * Creates configuration using configurator, initializes the graph using
+   * graphInitializer, and checks it via graphChecker.
+   *
+   * Supports using TEST_REPEAT_TIMES for running the same test multiple times.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  void runTest(
+      final TestGraphModifier<? super I, ? super V, ? super E> graphInitializer,
+      final TestGraphChecker<? super I, ? super V, ? super E> graphChecker,
+      final BulkConfigurator configurator) throws Exception {
+    repeat(
+        repeatTimes(),
+        new OneTest() {
+          @Override
+          public void test() throws Exception {
+            GiraphConfiguration conf = new GiraphConfiguration();
+            configurator.configure(conf);
+            BlockUtils.initAndCheckConfig(conf);
+            runTest(graphInitializer, graphChecker, conf);
+          }
+        });
+  }
+
+  /**
+   * Uses provided configuration, initializes the graph using
+   * graphInitializer, and checks it via graphChecker.
+   */
+  public static
+  <I extends WritableComparable, E extends Writable, V extends Writable>
+  void runTest(
+      TestGraphModifier<? super I, ? super V, ? super E> graphInitializer,
+      TestGraphChecker<? super I, ? super V, ? super E> graphChecker,
+      GiraphConfiguration conf) throws Exception {
+    NumericTestGraph<I, V, E> graph = new NumericTestGraph<>(conf);
+    graphInitializer.modifyGraph((NumericTestGraph) graph);
+    runTest(graph, graphChecker);
+  }
+
+  /**
+   * Base of runTest. Takes a created graph, a graph-checker and conf and runs
+   * the test.
+   */
+  public static
+  <I extends WritableComparable, E extends Writable, V extends Writable>
+  void runTest(
+      NumericTestGraph<I, V, E> graph,
+      TestGraphChecker<? super I, ? super V, ? super E> graphChecker
+  ) throws Exception {
+    graph = new NumericTestGraph<I, V, E>(
+      LocalBlockRunner.runApp(
+          graph.getTestGraph(), useFullDigraphTests(graph.getConf())));
+    if (graphChecker != null) {
+      graphChecker.checkOutput((NumericTestGraph) graph);
+    }
+  }
+
+  /**
+   * Chain execution of multiple TestGraphModifier into one.
+   */
+  @SafeVarargs
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  TestGraphModifier<I, V, E> chainModifiers(
+          final TestGraphModifier<I, V, E>... graphModifiers) {
+    return new TestGraphModifier<I, V, E>() {
+      @Override
+      public void modifyGraph(
+          NumericTestGraph<I, V, E> graph) {
+        for (TestGraphModifier<I, V, E> graphModifier : graphModifiers) {
+          graphModifier.modifyGraph(graph);
+        }
+      }
+    };
+  }
+
+  /**
+   * Chain execution of multiple BulkConfigurators into one.
+   *
+   * Order might matter, if they are setting the same fields.
+   * (later one will override what previous one already set).
+   */
+  public static BulkConfigurator chainConfigurators(
+      final BulkConfigurator... configurators) {
+    return new BulkConfigurator() {
+      @Override
+      public void configure(GiraphConfiguration conf) {
+        for (BulkConfigurator configurator : configurators) {
+          configurator.configure(conf);
+        }
+      }
+    };
+  }
+
+
+  public static Supplier<DoubleWritable> doubleSupplier(final double value) {
+    return new Supplier<DoubleWritable>() {
+      @Override
+      public DoubleWritable get() {
+        return new DoubleWritable(value);
+      }
+    };
+  }
+
+  public static Supplier<NullWritable> nullSupplier() {
+    return new Supplier<NullWritable>() {
+      @Override
+      public NullWritable get() {
+        return NullWritable.get();
+      }
+    };
+  }
+
+  /** Interface for running a single test that can throw an exception */
+  interface OneTest {
+    void test() throws Exception;
+  }
+
+  private static void repeat(int times, OneTest test) throws Exception {
+    if (times == 1) {
+      test.test();
+    } else {
+      int failures = 0;
+      StringBuilder failureMsgs = new StringBuilder();
+      AssertionError firstError = null;
+      for (int i = 0; i < times; i++) {
+        try {
+          test.test();
+        } catch (AssertionError error) {
+          failures++;
+          failureMsgs.append("\n").append(error.getMessage());
+          if (firstError == null) {
+            firstError = error;
+          }
+        }
+      }
+
+      if (failures > 0) {
+        throw new AssertionError(
+            "Failed " + failures + " times out of " + times +
+            " runs, messages: " + failureMsgs,
+            firstError);
+      }
+    }
+  }
+
+  private static boolean useFullDigraphTests(GiraphConfiguration conf) {
+    return USE_FULL_GIRAPH_ENV_IN_TESTS.get(conf) ||
+        System.getProperty("test_setup.UseFullGiraphEnvInTests") != null;
+  }
+
+  private static int repeatTimes() {
+    String value = System.getProperty("test_setup.TestRepeatTimes");
+    return value != null ? Integer.parseInt(value) : TEST_REPEAT_TIMES;
+  }
+
+  public static void setTestRepeatTimes(int testRepeatTimes) {
+    TestGraphUtils.TEST_REPEAT_TIMES = testRepeatTimes;
+  }
+}
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/EachVertexInit.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/EachVertexInit.java
new file mode 100644 (file)
index 0000000..d51d819
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.test_setup.graphs;
+
+import org.apache.giraph.block_app.test_setup.NumericTestGraph;
+import org.apache.giraph.block_app.test_setup.TestGraphModifier;
+import org.apache.giraph.function.Consumer;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Traverse each Vertex in the graph, and initialize it with a given
+ * consumer function.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+public class EachVertexInit<I extends WritableComparable, V extends Writable,
+    E extends Writable> implements TestGraphModifier<I, V, E> {
+  private final Consumer<Vertex<I, V, E>> vertexConsumer;
+
+  public EachVertexInit(Consumer<Vertex<I, V, E>> vertexConsumer) {
+    this.vertexConsumer = vertexConsumer;
+  }
+
+  @Override
+  public void modifyGraph(NumericTestGraph<I, V, E> graph) {
+    for (Vertex<I, V, E> vertex : graph.getTestGraph()) {
+      vertexConsumer.apply(vertex);
+    }
+  }
+}
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small1GraphInit.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small1GraphInit.java
new file mode 100644 (file)
index 0000000..ecec024
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.test_setup.graphs;
+
+import org.apache.giraph.block_app.test_setup.NumericTestGraph;
+import org.apache.giraph.block_app.test_setup.TestGraphModifier;
+import org.apache.giraph.function.Supplier;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+
+/**
+ * Create a network that looks like:
+ *   1      5
+ *  / \    / \    6
+ * 0---2--3---4
+ *
+ * where 6 is disconnected from the rest of the network.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+public class Small1GraphInit<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    implements TestGraphModifier<I, V, E> {
+
+  private final Supplier<V> valueSupplier;
+  private final Supplier<E> edgeSupplier;
+
+  public Small1GraphInit(
+      Supplier<V> valueSupplier, Supplier<E> edgeSupplier) {
+    this.valueSupplier = valueSupplier;
+    this.edgeSupplier = edgeSupplier;
+  }
+
+  @Override
+  public void modifyGraph(NumericTestGraph<I, V, E> graph) {
+    graph.addVertex(0, valueSupplier.get(), edgeSupplier, 1, 2);
+    graph.addVertex(1, valueSupplier.get(), edgeSupplier, 0, 2);
+    graph.addVertex(2, valueSupplier.get(), edgeSupplier, 0, 1, 3);
+    graph.addVertex(3, valueSupplier.get(), edgeSupplier, 2, 4, 5);
+    graph.addVertex(4, valueSupplier.get(), edgeSupplier, 3, 5);
+    graph.addVertex(5, valueSupplier.get(), edgeSupplier, 3, 4);
+    graph.addVertex(6, valueSupplier.get(), edgeSupplier);
+  }
+}
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small2GraphInit.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small2GraphInit.java
new file mode 100644 (file)
index 0000000..eb38c45
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.test_setup.graphs;
+
+import org.apache.giraph.block_app.test_setup.NumericTestGraph;
+import org.apache.giraph.block_app.test_setup.TestGraphModifier;
+import org.apache.giraph.function.Supplier;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+
+/**
+ * Create a network that looks like:
+ *   1      5
+ *  / \    / \    6
+ * 0---2  3---4
+ *
+ * where 6 is disconnected from the rest of the network.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+public class Small2GraphInit<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    implements TestGraphModifier<I, V, E> {
+
+  private final Supplier<V> valueSupplier;
+  private final Supplier<E> edgeSupplier;
+
+  public Small2GraphInit(
+      Supplier<V> valueSupplier, Supplier<E> edgeSupplier) {
+    this.valueSupplier = valueSupplier;
+    this.edgeSupplier = edgeSupplier;
+  }
+
+  @Override
+  public void modifyGraph(NumericTestGraph<I, V, E> graph) {
+    graph.addVertex(0, valueSupplier.get(), edgeSupplier, 1, 2);
+    graph.addVertex(1, valueSupplier.get(), edgeSupplier, 0, 2);
+    graph.addVertex(2, valueSupplier.get(), edgeSupplier, 0, 1);
+    graph.addVertex(3, valueSupplier.get(), edgeSupplier, 4, 5);
+    graph.addVertex(4, valueSupplier.get(), edgeSupplier, 3, 5);
+    graph.addVertex(5, valueSupplier.get(), edgeSupplier, 3, 4);
+    graph.addVertex(6, valueSupplier.get(), edgeSupplier);
+  }
+}
+
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java
new file mode 100644 (file)
index 0000000..3de158a
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.test_setup.graphs;
+
+import java.util.Random;
+
+import org.apache.giraph.block_app.test_setup.NumericTestGraph;
+import org.apache.giraph.block_app.test_setup.TestGraphModifier;
+import org.apache.giraph.conf.FloatConfOption;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.function.Supplier;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Creates synthetic graphs, that can have community structure.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+public class SyntheticGraphInit<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    implements TestGraphModifier<I, V, E> {
+  public static final IntConfOption NUM_COMMUNITIES = new IntConfOption(
+      "test.SyntheticGraphCreator.NUM_COMMUNITIES", -1, "");
+  public static final IntConfOption NUM_VERTICES = new IntConfOption(
+      "test.SyntheticGraphCreator.NUM_VERTICES", -1, "");
+  public static final IntConfOption NUM_EDGES_PER_VERTEX = new IntConfOption(
+      "test.SyntheticGraphCreator.NUM_EDGES_PER_VERTEX", -1, "");
+  public static final FloatConfOption ACTUAL_LOCALITY_RATIO =
+      new FloatConfOption(
+            "test.SyntheticGraphCreator.ACTUAL_LOCALITY_RATIO", -1, "");
+
+  protected final Supplier<E> edgeSupplier;
+
+  public SyntheticGraphInit(Supplier<E> edgeSupplier) {
+    this.edgeSupplier = edgeSupplier;
+  }
+
+  public SyntheticGraphInit() {
+    this.edgeSupplier = null;
+  }
+
+
+  @Override
+  public void modifyGraph(NumericTestGraph<I, V, E> graph) {
+    GiraphConfiguration conf = graph.getConf();
+    int numPartitions = NUM_COMMUNITIES.get(conf);
+    int numVertices = NUM_VERTICES.get(conf);
+    int numEdgesPerVertex = NUM_EDGES_PER_VERTEX.get(conf);
+    int communitySize = numVertices / numPartitions;
+    float actualLocalityRatio = ACTUAL_LOCALITY_RATIO.get(conf);
+    Random random = new Random(42);
+    for (int i = 0; i < numVertices; ++i) {
+      for (int e = 0; e < numEdgesPerVertex / 2; ++e) {
+        boolean localEdge = random.nextFloat() < actualLocalityRatio;
+        int community = i / communitySize;
+        int j;
+        do {
+          if (localEdge) {
+            j = community * communitySize + random.nextInt(communitySize);
+          } else {
+            j = random.nextInt(numVertices);
+          }
+        } while (j == i);
+        graph.addSymmetricEdge(
+            i, j, edgeSupplier != null ? edgeSupplier.get() : null);
+      }
+    }
+
+//    if (vertexModifier != null) {
+//      for (int i = 0; i < numVertices; i++) {
+//        vertexModifier.modifyVertexValue(i, graph.getVertex(i).getValue());
+//      }
+//    }
+  }
+}
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/package-info.java
new file mode 100644 (file)
index 0000000..e16923b
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Common Graphs for unit tests.
+ */
+package org.apache.giraph.block_app.test_setup.graphs;
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/package-info.java
new file mode 100644 (file)
index 0000000..763f79e
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utilities for unit tests.
+ */
+package org.apache.giraph.block_app.test_setup;
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/SendingMessagesTest.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/SendingMessagesTest.java
new file mode 100644 (file)
index 0000000..d4a7c2f
--- /dev/null
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework;
+
+
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.library.Pieces;
+import org.apache.giraph.block_app.library.VertexSuppliers;
+import org.apache.giraph.block_app.test_setup.NumericTestGraph;
+import org.apache.giraph.block_app.test_setup.TestGraphChecker;
+import org.apache.giraph.block_app.test_setup.TestGraphModifier;
+import org.apache.giraph.block_app.test_setup.TestGraphUtils;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.function.vertex.ConsumerWithVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SendingMessagesTest {
+  @Test
+  public void createVertexOnMsgsTest() throws Exception {
+    TestGraphUtils.runTest(
+        new TestGraphModifier<LongWritable, LongWritable, Writable>() {
+          @Override
+          public void modifyGraph(NumericTestGraph<LongWritable, LongWritable, Writable> graph) {
+            graph.addEdge(1, 2);
+          }
+        },
+        new TestGraphChecker<LongWritable, LongWritable, Writable>() {
+          @Override
+          public void checkOutput(NumericTestGraph<LongWritable, LongWritable, Writable> graph) {
+            Assert.assertEquals(1, graph.getValue(2).get());
+            Assert.assertEquals(0, graph.getValue(1).get());
+          }
+        },
+        new BulkConfigurator() {
+          @Override
+          public void configure(GiraphConfiguration conf) {
+            BlockUtils.setBlockFactoryClass(conf, SendingMessagesToNeighborsBlockFactory.class);
+          }
+        });
+  }
+
+  @Test
+  public void doNotCreateVertexOnMsgsTest() throws Exception {
+    TestGraphUtils.runTest(
+        new TestGraphModifier<LongWritable, LongWritable, Writable>() {
+          @Override
+          public void modifyGraph(NumericTestGraph<LongWritable, LongWritable, Writable> graph) {
+            graph.addEdge(1, 2);
+          }
+        },
+        new TestGraphChecker<LongWritable, LongWritable, Writable>() {
+          @Override
+          public void checkOutput(NumericTestGraph<LongWritable, LongWritable, Writable> graph) {
+            Assert.assertNull(graph.getVertex(2));
+            Assert.assertEquals(0, graph.getValue(1).get());
+          }
+        },
+        new BulkConfigurator() {
+          @Override
+          public void configure(GiraphConfiguration conf) {
+            BlockUtils.setBlockFactoryClass(conf, SendingMessagesToNeighborsBlockFactory.class);
+            GiraphConstants.RESOLVER_CREATE_VERTEX_ON_MSGS.set(conf, false);
+          }
+        });
+  }
+
+  @Test
+  public void createMultiMsgs() throws Exception {
+    TestGraphUtils.runTest(
+        new TestGraphModifier<LongWritable, LongWritable, Writable>() {
+          @Override
+          public void modifyGraph(NumericTestGraph<LongWritable, LongWritable, Writable> graph) {
+            graph.addSymmetricEdge(1, 2);
+            graph.addSymmetricEdge(3, 2);
+          }
+        },
+        new TestGraphChecker<LongWritable, LongWritable, Writable>() {
+          @Override
+          public void checkOutput(NumericTestGraph<LongWritable, LongWritable, Writable> graph) {
+            Assert.assertEquals(3, graph.getValue(2).get());
+            Assert.assertEquals(2, graph.getValue(1).get());
+            Assert.assertEquals(2, graph.getValue(3).get());
+          }
+        },
+        new BulkConfigurator() {
+          @Override
+          public void configure(GiraphConfiguration conf) {
+            BlockUtils.setBlockFactoryClass(conf, SendingMessagesToNeighborsBlockFactory.class);
+          }
+        });
+  }
+
+  public static class SendingMessagesToNeighborsBlockFactory extends TestLongNullNullBlockFactory {
+    @Override
+    protected Class<? extends Writable> getVertexValueClass(GiraphConfiguration conf) {
+      return LongWritable.class;
+    }
+
+    @Override
+    public Block createBlock(GiraphConfiguration conf) {
+      return Pieces.sendMessageToNeighbors(
+          "SendToNeighbors",
+          LongWritable.class,
+          VertexSuppliers.<LongWritable, LongWritable, Writable>vertexIdSupplier(),
+          new ConsumerWithVertex<LongWritable, LongWritable, Writable, Iterable<LongWritable>>() {
+            @Override
+            public void apply(Vertex<LongWritable, LongWritable, Writable> vertex,
+                Iterable<LongWritable> messages) {
+              long max = 0;
+              for (LongWritable v : messages) {
+                max = Math.max(max, v.get());
+              }
+              vertex.getValue().set(max);
+            }
+          });
+    }
+  }
+}
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestLongNullNullBlockFactory.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestLongNullNullBlockFactory.java
new file mode 100644 (file)
index 0000000..927e715
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+public abstract class TestLongNullNullBlockFactory extends AbstractBlockFactory<Object> {
+  @Override
+  protected Class<? extends WritableComparable> getVertexIDClass(GiraphConfiguration conf) {
+    return LongWritable.class;
+  }
+
+  @Override
+  protected Class<? extends Writable> getVertexValueClass(GiraphConfiguration conf) {
+    return NullWritable.class;
+  }
+
+  @Override
+  protected Class<? extends Writable> getEdgeValueClass(GiraphConfiguration conf) {
+    return NullWritable.class;
+  }
+
+  @Override
+  public Object createExecutionStage(GiraphConfiguration conf) {
+    return new Object();
+  }
+}
\ No newline at end of file
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestWorkerMessages.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestWorkerMessages.java
new file mode 100644 (file)
index 0000000..a68fca2
--- /dev/null
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework;
+
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
+import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.SequenceBlock;
+import org.apache.giraph.block_app.framework.piece.PieceWithWorkerContext;
+import org.apache.giraph.block_app.test_setup.NumericTestGraph;
+import org.apache.giraph.block_app.test_setup.TestGraphModifier;
+import org.apache.giraph.block_app.test_setup.TestGraphUtils;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.types.NoMessage;
+import org.apache.giraph.utils.TestGraph;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test sending worker to worker messages
+ */
+public class TestWorkerMessages {
+  @Test
+  public void testWorkerMessages() throws Exception {
+    GiraphConfiguration conf = new GiraphConfiguration();
+    BlockUtils.setAndInitBlockFactoryClass(conf, TestWorkerMessagesBlockFactory.class);
+    TestGraph testGraph = new TestGraph(conf);
+    testGraph.addEdge(new LongWritable(1), new LongWritable(2), NullWritable.get());
+    LocalBlockRunner.runApp(testGraph);
+  }
+
+  @Test
+  public void testWithTestSetup() throws Exception {
+    TestGraphUtils.runTest(
+        new TestGraphModifier<WritableComparable, Writable, Writable>() {
+          @Override
+          public void modifyGraph(NumericTestGraph<WritableComparable, Writable, Writable> graph) {
+            graph.addEdge(1, 2);
+          }
+        },
+        null,
+        new BulkConfigurator() {
+          @Override
+          public void configure(GiraphConfiguration conf) {
+            BlockUtils.setBlockFactoryClass(conf, TestWorkerMessagesBlockFactory.class);
+          }
+        });
+  }
+
+  public static class TestWorkerMessagesBlockFactory extends TestLongNullNullBlockFactory {
+    @Override
+    public Block createBlock(GiraphConfiguration conf) {
+      return new SequenceBlock(
+          new TestWorkerMessagesPiece(2, 4, 11),
+          new TestWorkerMessagesPiece(3, 5, 2, 100));
+    }
+  }
+
+  public static class TestWorkerMessagesPiece extends PieceWithWorkerContext<LongWritable,
+      Writable, Writable, NoMessage, Object, LongWritable, Object> {
+    private final HashSet<Long> values;
+
+    public TestWorkerMessagesPiece(long... values) {
+      this.values = new HashSet<>();
+      for (long value : values) {
+        this.values.add(value);
+      }
+    }
+
+    @Override
+    public void workerContextSend(BlockWorkerContextSendApi<LongWritable> workerContextApi,
+        Object executionStage, Object workerValue) {
+      for (long value : values) {
+        workerContextApi.sendMessageToWorker(new LongWritable(value),
+            workerContextApi.getMyWorkerIndex());
+      }
+    }
+
+    @Override
+    public void workerContextReceive(BlockWorkerContextReceiveApi workerContextApi,
+        Object executionStage, Object workerValue, List<LongWritable> workerMessages) {
+      Assert.assertEquals(values.size(), workerMessages.size());
+      for (LongWritable workerMessage : workerMessages) {
+        Assert.assertTrue(values.remove(workerMessage.get()));
+      }
+    }
+  }
+}
index 5e4eb11..fec5b39 100644 (file)
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.giraph.block_app.reducers.array;
 
 import static org.junit.Assert.assertEquals;