Add PrepareGraphPieces.isSymmetricBlock to check for a symmetric graph
authorYuri Schimke <yuri@schimke.ee>
Thu, 25 Feb 2016 18:42:21 +0000 (10:42 -0800)
committerMaja Kabiljo <majakabiljo@fb.com>
Thu, 25 Feb 2016 18:42:21 +0000 (10:42 -0800)
Summary:
PrepareGraphPieces.isSymmetricBlock is a reusable factory function
for creating blocks that check if a graph is symmetric by
XOR reducing a preditable hash of the edges pairs (V1, V2)

Test Plan:
Unit Tests for all changed files, testing on demo graphs.
Will run a full test job.

Reviewers: spupyrev, dionysis.logothetis, ikabiljo, maja.kabiljo

Reviewed By: maja.kabiljo

Subscribers: maja.kabiljo

Differential Revision: https://reviews.facebook.net/D54411

giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/PrepareGraphPieces.java
giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestSymmetryCheck.java [new file with mode: 0644]
giraph-core/src/main/java/org/apache/giraph/reducers/impl/LongXorReduce.java [new file with mode: 0644]
giraph-core/src/main/java/org/apache/giraph/utils/hashing/LongWritableFunnel.java [new file with mode: 0644]
giraph-core/src/main/java/org/apache/giraph/utils/hashing/package-info.java [new file with mode: 0644]
giraph-core/src/test/java/org/apache/giraph/reducers/impl/TestLongXorReduce.java [new file with mode: 0644]
giraph-core/src/test/java/org/apache/giraph/utils/hashing/TestLongWritableFunnel.java [new file with mode: 0644]

index b4d40dc..41c315a 100644 (file)
  */
 package org.apache.giraph.block_app.library.prepare_graph;
 
+import com.google.common.hash.Funnel;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
 import java.util.Iterator;
-
 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
@@ -42,8 +45,11 @@ import org.apache.giraph.function.ObjectTransfer;
 import org.apache.giraph.function.primitive.Int2ObjFunction;
 import org.apache.giraph.function.primitive.Obj2DoubleFunction;
 import org.apache.giraph.function.vertex.ConsumerWithVertex;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.object.MultiSizedReusable;
 import org.apache.giraph.reducers.impl.SumReduce;
+import org.apache.giraph.reducers.impl.LongXorReduce;
 import org.apache.giraph.types.NoMessage;
 import org.apache.giraph.types.ops.NumericTypeOps;
 import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
@@ -397,4 +403,84 @@ public class PrepareGraphPieces {
       }),
       sumEdgeWeights);
   }
+
+  /**
+   * isSymmetricBlock using a sensible default HashFunction
+   *
+   * @see Hashing#murmur3_128()
+   * @see #isSymmetricBlock(Funnel, Consumer, HashFunction)
+   */
+  public static <I extends WritableComparable> Block isSymmetricBlock(
+      Funnel<I> idHasher,
+      Consumer<Boolean> consumer) {
+    return isSymmetricBlock(idHasher, consumer, Hashing.murmur3_128());
+  }
+
+  /**
+   * Checks whether a graph is symmetric and returns the result to a consumer.
+   *
+   * @param idHasher Allows Vertex ids to submit themselves to hashing
+   *                 without artificially converting to an intermediate
+   *                 type e.g. Long or String.
+   * @param consumer the return store for whether the graph is symmetric
+   * @param <I> the type of Vertex id
+   * @return block that checks for symmetric graphs
+   */
+  public static <I extends WritableComparable> Block isSymmetricBlock(
+      Funnel<I> idHasher,
+      Consumer<Boolean> consumer,
+      HashFunction hashFunction) {
+
+    SupplierFromVertex<I, Writable, Writable, LongWritable>
+        s = (vertex) -> new LongWritable(
+          xorEdges(vertex, idHasher, hashFunction));
+
+    Consumer<LongWritable> longConsumer =
+        (xorValue) -> consumer.apply(xorValue.get() == 0L);
+
+    return Pieces.reduce(
+        "HashEdges",
+        LongXorReduce.INSTANCE,
+        s,
+        longConsumer
+    );
+  }
+
+  /**
+   * Predictably XOR all edges for a single vertex. The value to be
+   * XORed is (smaller(v1|v2), larger(v1|v2)) and skipping self-loops
+   * since we want to detect asymmetric graphs.
+   *
+   * Uses a HashFunction to get high collision prevention and bit dispersion.
+   *
+   * @see HashFunction
+   */
+  private static <I extends WritableComparable> long xorEdges(
+      Vertex<I, Writable, Writable> vertex,
+      Funnel<I> idHasher, HashFunction hashFunction) {
+    long result = 0L;
+
+    for (Edge<I, Writable> e : vertex.getEdges()) {
+      Hasher h = hashFunction.newHasher();
+
+      I thisVertexId = vertex.getId();
+      I thatVertexId = e.getTargetVertexId();
+
+      int comparison = thisVertexId.compareTo(thatVertexId);
+
+      if (comparison != 0) {
+        if (comparison < 0) {
+          idHasher.funnel(thisVertexId, h);
+          idHasher.funnel(thatVertexId, h);
+        } else {
+          idHasher.funnel(thatVertexId, h);
+          idHasher.funnel(thisVertexId, h);
+        }
+
+        result ^= h.hash().asLong();
+      }
+    }
+
+    return result;
+  }
 }
diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestSymmetryCheck.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestSymmetryCheck.java
new file mode 100644 (file)
index 0000000..ee2e983
--- /dev/null
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.block_app.library.prepare_graph;
+
+import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.SequenceBlock;
+import org.apache.giraph.block_app.test_setup.NumericTestGraph;
+import org.apache.giraph.block_app.test_setup.graphs.Small1GraphInit;
+import org.apache.giraph.block_app.test_setup.graphs.SmallDirectedTreeGraphInit;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.function.ObjectHolder;
+import org.apache.giraph.types.ops.LongTypeOps;
+import org.apache.giraph.utils.hashing.LongWritableFunnel;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestSymmetryCheck {
+  private long IGNORED = -1L;
+
+  private NumericTestGraph<LongWritable, LongWritable, LongWritable> graph;
+  private ObjectHolder<Boolean> holder = new ObjectHolder<>();
+  private Block isBlock = PrepareGraphPieces.isSymmetricBlock(LongWritableFunnel.INSTANCE, holder);
+
+  @Before
+  public void initConf() {
+    GiraphConfiguration conf = new GiraphConfiguration();
+    GiraphConstants.VERTEX_ID_CLASS.set(conf, LongWritable.class);
+    GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class);
+    GiraphConstants.EDGE_VALUE_CLASS.set(conf, LongWritable.class);
+
+    graph = new NumericTestGraph<>(conf);
+  }
+
+  @Test
+  public void testSimpleLoop() throws Exception {
+    graph.addVertex(0l, IGNORED, 0l);
+
+    LocalBlockRunner.runBlock(graph.getTestGraph(), isBlock, new Object());
+
+    assertTrue(holder.get());
+  }
+
+  @Test
+  public void testSimpleAsymmetric() throws Exception {
+    graph.addVertex(0l, IGNORED, IGNORED, 1l);
+    graph.addVertex(1l, IGNORED);
+
+    LocalBlockRunner.runBlock(graph.getTestGraph(), isBlock, new Object());
+
+    assertFalse(holder.get());
+  }
+
+  @Test
+  public void testSimpleSymmetric() throws Exception {
+    graph.addVertex(0l, IGNORED, IGNORED, 1l);
+    graph.addVertex(1l, IGNORED, IGNORED, 0l);
+
+    LocalBlockRunner.runBlock(graph.getTestGraph(), isBlock, new Object());
+
+    assertTrue(holder.get());
+  }
+
+  @Test
+  public void testSmall1Graph() throws Exception {
+    Small1GraphInit i = new Small1GraphInit<LongWritable, LongWritable, LongWritable>();
+    i.modifyGraph(graph);
+
+    LocalBlockRunner.runBlock(graph.getTestGraph(), isBlock, new Object());
+
+    assertTrue(holder.get());
+  }
+
+  @Test
+  public void testSmallDirectedTreeAsymmetric() throws Exception {
+    SmallDirectedTreeGraphInit i = new SmallDirectedTreeGraphInit<LongWritable, LongWritable, LongWritable>();
+    i.modifyGraph(graph);
+
+    LocalBlockRunner.runBlock(graph.getTestGraph(), isBlock, new Object());
+
+    assertFalse(holder.get());
+  }
+
+  @Test
+  public void testSmallDirectedTreeSymmetric() throws Exception {
+    SmallDirectedTreeGraphInit i = new SmallDirectedTreeGraphInit<LongWritable, LongWritable, LongWritable>();
+    i.modifyGraph(graph);
+
+    Block block =
+        new SequenceBlock(PrepareGraphPieces.makeSymmetricWeighted(LongTypeOps.INSTANCE, LongTypeOps.INSTANCE),
+            isBlock);
+    LocalBlockRunner.runBlock(graph.getTestGraph(), block, new Object());
+
+    assertTrue(holder.get());
+  }
+}
diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/LongXorReduce.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/LongXorReduce.java
new file mode 100644 (file)
index 0000000..741e278
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.reducers.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.giraph.reducers.ReduceSameTypeOperation;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * ReduceOperation that XORs (^) values together.
+ */
+public class LongXorReduce extends ReduceSameTypeOperation<LongWritable> {
+  /**
+   * Long XOR, equivalent to l1 ^ l2
+   */
+  public static final LongXorReduce INSTANCE = new LongXorReduce();
+
+  /** Constructor used for deserialization only */
+  public LongXorReduce() {
+  }
+
+  @Override public LongWritable createInitialValue() {
+    return new LongWritable(0L);
+  }
+
+  @Override public LongWritable reduce(LongWritable curValue,
+      LongWritable valueToReduce) {
+    curValue.set(curValue.get() ^ valueToReduce.get());
+    return curValue;
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+  }
+}
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/hashing/LongWritableFunnel.java b/giraph-core/src/main/java/org/apache/giraph/utils/hashing/LongWritableFunnel.java
new file mode 100644 (file)
index 0000000..59ffcaa
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils.hashing;
+
+import com.google.common.hash.Funnel;
+import com.google.common.hash.PrimitiveSink;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Hashing strategy for LongWritable. Implemented via an enum per advice
+ * in Funnel.
+ *
+ * @see Funnel
+ */
+public enum LongWritableFunnel implements Funnel<LongWritable> {
+  /** singleton */
+  INSTANCE;
+
+  @Override public void funnel(LongWritable w, PrimitiveSink into) {
+    into.putLong(w.get());
+  }
+}
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/hashing/package-info.java b/giraph-core/src/main/java/org/apache/giraph/utils/hashing/package-info.java
new file mode 100644 (file)
index 0000000..e2c35c0
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of utility classes supporting strong hash functions
+ * (currently from Guava).
+ */
+package org.apache.giraph.utils.hashing;
diff --git a/giraph-core/src/test/java/org/apache/giraph/reducers/impl/TestLongXorReduce.java b/giraph-core/src/test/java/org/apache/giraph/reducers/impl/TestLongXorReduce.java
new file mode 100644 (file)
index 0000000..2e9e717
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.reducers.impl;
+
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestLongXorReduce {
+  private LongXorReduce xor = LongXorReduce.INSTANCE;
+
+  @Test
+  public void testZero() {
+    Assert.assertEquals(0b0L, xor.createInitialValue().get());
+  }
+
+  @Test
+  public void testXor() {
+    LongWritable curWritable = new LongWritable(0b000L);
+
+    Assert.assertEquals(0b001L, xor.reduce(curWritable, new LongWritable(0b001L)).get());
+    Assert.assertEquals(0b011L, xor.reduce(curWritable, new LongWritable(0b010L)).get());
+    Assert.assertEquals(0b100L, xor.reduce(curWritable, new LongWritable(0b111L)).get());
+  }
+}
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/hashing/TestLongWritableFunnel.java b/giraph-core/src/test/java/org/apache/giraph/utils/hashing/TestLongWritableFunnel.java
new file mode 100644 (file)
index 0000000..f528bff
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils.hashing;
+
+import com.google.common.hash.PrimitiveSink;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestLongWritableFunnel {
+  @Test
+  public void testWritesALong() {
+    PrimitiveSink sink = Mockito.mock(PrimitiveSink.class);
+
+    LongWritableFunnel.INSTANCE.funnel(new LongWritable(10L), sink);
+
+    Mockito.verify(sink).putLong(10L);
+    Mockito.verifyNoMoreInteractions(sink);
+  }
+}