GIRAPH-1018: Improving PartitionStore API to better match its expected behaviour
authorHassan Eslami <heslami@fb.com>
Mon, 29 Jun 2015 22:47:12 +0000 (15:47 -0700)
committerAvery Ching <aching@fb.com>
Mon, 29 Jun 2015 23:29:01 +0000 (16:29 -0700)
(heslami via aching)

Summary: Currently for statistics operations on each partition, entire partition is loaded using getOrCreatePartition method of PartitionStore. This diff improves the API of PartitionStore by adding required methods to only return the statistics.

Test Plan: mvn clean verify

Reviewers: dionysis.logothetis, maja.kabiljo, avery.ching

Reviewed By: avery.ching

Differential Revision: https://reviews.facebook.net/D40731

13 files changed:
CHANGELOG
giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java

index 2077a2d..c8c80df 100644 (file)
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 1.2.0 - unreleased
+  GIRAPH-1018: Improving PartitionStore API to better match its expected behaviour
+  (heslami via aching)
+
   GIRAPH-1012: Remove giraph-hive (majakabiljo)
 
   GIRAPH-1009: Spammy 'lost reservation' messages from ZooKeeper in workers' log at the end of
index 5342593..0732079 100644 (file)
@@ -30,7 +30,6 @@ import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessagesIterable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.partition.Partition;
 import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
 import org.apache.giraph.types.ops.TypeOpsUtils;
 import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
@@ -102,15 +101,14 @@ public class IdByteArrayMessageStore<I extends WritableComparable,
 
     map = new Int2ObjectOpenHashMap<Basic2ObjectMap<I, DataInputOutput>>();
     for (int partitionId : service.getPartitionStore().getPartitionIds()) {
-      Partition<I, ?, ?> partition =
-          service.getPartitionStore().getOrCreatePartition(partitionId);
       Basic2ObjectMap<I, DataInputOutput> partitionMap =
           idTypeOps.create2ObjectOpenHashMap(
-              Math.max(10, (int) partition.getVertexCount()),
+              Math.max(10,
+                  (int) service.getPartitionStore()
+                      .getPartitionVertexCount(partitionId)),
               dataInputOutputWriter);
 
       map.put(partitionId, partitionMap);
-      service.getPartitionStore().putPartition((Partition) partition);
     }
   }
 
index 373389d..a61536f 100644 (file)
@@ -31,7 +31,6 @@ import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.partition.Partition;
 import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
 import org.apache.giraph.types.ops.TypeOpsUtils;
 import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
@@ -106,12 +105,10 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable,
 
     map = new Int2ObjectOpenHashMap<>();
     for (int partitionId : service.getPartitionStore().getPartitionIds()) {
-      Partition<I, ?, ?> partition =
-          service.getPartitionStore().getOrCreatePartition(partitionId);
       Basic2ObjectMap<I, M> partitionMap = idTypeOps.create2ObjectOpenHashMap(
-          Math.max(10, (int) partition.getVertexCount()), messageWriter);
+          Math.max(10, (int) service.getPartitionStore()
+              .getPartitionVertexCount(partitionId)), messageWriter);
       map.put(partitionId, partitionMap);
-      service.getPartitionStore().putPartition((Partition) partition);
     }
   }
 
index 0012bf0..a8c19be 100644 (file)
@@ -23,7 +23,6 @@ import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessagesIterable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.partition.Partition;
 import org.apache.giraph.utils.VertexIdMessageBytesIterator;
 import org.apache.giraph.utils.VertexIdMessageIterator;
 import org.apache.giraph.utils.VertexIdMessages;
@@ -84,13 +83,11 @@ public class IntByteArrayMessageStore<M extends Writable>
     map =
         new Int2ObjectOpenHashMap<Int2ObjectOpenHashMap<DataInputOutput>>();
     for (int partitionId : service.getPartitionStore().getPartitionIds()) {
-      Partition<IntWritable, Writable, Writable> partition =
-          service.getPartitionStore().getOrCreatePartition(partitionId);
       Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
           new Int2ObjectOpenHashMap<DataInputOutput>(
-              (int) partition.getVertexCount());
+              (int) service.getPartitionStore()
+                  .getPartitionVertexCount(partitionId));
       map.put(partitionId, partitionMap);
-      service.getPartitionStore().putPartition(partition);
     }
   }
 
index 8095ad6..7a4ed09 100644 (file)
@@ -33,8 +33,6 @@ import java.util.List;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.partition.Partition;
-import org.apache.giraph.partition.PartitionStore;
 import org.apache.giraph.utils.EmptyIterable;
 import org.apache.giraph.utils.VertexIdMessageIterator;
 import org.apache.giraph.utils.VertexIdMessages;
@@ -74,14 +72,10 @@ public class IntFloatMessageStore
 
     map = new Int2ObjectOpenHashMap<Int2FloatOpenHashMap>();
     for (int partitionId : service.getPartitionStore().getPartitionIds()) {
-      PartitionStore<IntWritable, Writable, Writable> partitionStore =
-        service.getPartitionStore();
-      Partition<IntWritable, Writable, Writable> partition =
-        partitionStore.getOrCreatePartition(partitionId);
-      Int2FloatOpenHashMap partitionMap =
-          new Int2FloatOpenHashMap((int) partition.getVertexCount());
+      Int2FloatOpenHashMap partitionMap = new Int2FloatOpenHashMap(
+          (int) service.getPartitionStore()
+              .getPartitionVertexCount(partitionId));
       map.put(partitionId, partitionMap);
-      partitionStore.putPartition(partition);
     }
   }
 
index dac98c9..069face 100644 (file)
@@ -33,7 +33,6 @@ import java.util.List;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.partition.Partition;
 import org.apache.giraph.utils.EmptyIterable;
 import org.apache.giraph.utils.VertexIdMessageIterator;
 import org.apache.giraph.utils.VertexIdMessages;
@@ -74,12 +73,10 @@ public class LongDoubleMessageStore
 
     map = new Int2ObjectOpenHashMap<Long2DoubleOpenHashMap>();
     for (int partitionId : service.getPartitionStore().getPartitionIds()) {
-      Partition<LongWritable, Writable, Writable> partition =
-          service.getPartitionStore().getOrCreatePartition(partitionId);
-      Long2DoubleOpenHashMap partitionMap =
-          new Long2DoubleOpenHashMap((int) partition.getVertexCount());
+      Long2DoubleOpenHashMap partitionMap = new Long2DoubleOpenHashMap(
+          (int) service.getPartitionStore()
+              .getPartitionVertexCount(partitionId));
       map.put(partitionId, partitionMap);
-      service.getPartitionStore().putPartition(partition);
     }
   }
 
index 9ee090e..50e8818 100644 (file)
@@ -26,7 +26,6 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.partition.Partition;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 
@@ -73,13 +72,10 @@ public abstract class LongAbstractMessageStore<M extends Writable, T>
 
     map = new Int2ObjectOpenHashMap<>();
     for (int partitionId : service.getPartitionStore().getPartitionIds()) {
-      Partition<LongWritable, Writable, Writable> partition =
-          service.getPartitionStore().getOrCreatePartition(partitionId);
-      Long2ObjectOpenHashMap<T> partitionMap =
-          new Long2ObjectOpenHashMap<T>(
-              (int) partition.getVertexCount());
+      Long2ObjectOpenHashMap<T> partitionMap = new Long2ObjectOpenHashMap<T>(
+          (int) service.getPartitionStore()
+              .getPartitionVertexCount(partitionId));
       map.put(partitionId, partitionMap);
-      service.getPartitionStore().putPartition(partition);
     }
   }
 
index 161e363..a75f62b 100644 (file)
@@ -48,7 +48,6 @@ import org.apache.giraph.metrics.GiraphTimer;
 import org.apache.giraph.metrics.GiraphTimerContext;
 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
-import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.partition.PartitionStore;
@@ -737,15 +736,10 @@ end[PURE_YARN]*/
     PartitionStore<I, V, E> partitionStore = serviceWorker.getPartitionStore();
     for (Integer partitionId : partitionStore.getPartitionIds()) {
       computePartitionIdQueue.add(partitionId);
-
-      Partition<I, V, E> partition =
-        partitionStore.getOrCreatePartition(partitionId);
-      verticesToCompute += partition.getVertexCount();
-      partitionStore.putPartition(partition);
+      verticesToCompute += partitionStore.getPartitionVertexCount(partitionId);
     }
     WorkerProgress.get().startSuperstep(
-        serviceWorker.getSuperstep(),
-        verticesToCompute,
+        serviceWorker.getSuperstep(), verticesToCompute,
         serviceWorker.getPartitionStore().getNumPartitions());
 
     GiraphTimerContext computeAllTimerContext = computeAll.time();
index 98b1fb0..7368420 100644 (file)
@@ -226,6 +226,26 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   }
 
   @Override
+  public long getPartitionVertexCount(int partitionId) {
+    MetaPartition meta = partitions.get(partitionId);
+    if (meta.getState() == State.ONDISK) {
+      return meta.getVertexCount();
+    } else {
+      return meta.getPartition().getVertexCount();
+    }
+  }
+
+  @Override
+  public long getPartitionEdgeCount(int partitionId) {
+    MetaPartition meta = partitions.get(partitionId);
+    if (meta.getState() == State.ONDISK) {
+      return meta.getEdgeCount();
+    } else {
+      return meta.getPartition().getEdgeCount();
+    }
+  }
+
+  @Override
   public Partition<I, V, E> getOrCreatePartition(Integer id) {
     MetaPartition meta = new MetaPartition(id);
     MetaPartition temp;
@@ -1093,6 +1113,8 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
     private long vertexCount;
     /** Previous number of vertices contained in the partition */
     private long prevVertexCount;
+    /** Number of edges contained in the partition */
+    private long edgeCount;
     /**
      * Sticky bit; if set, this partition is never supposed to be
      * written to disk
@@ -1115,6 +1137,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
       this.references = 0;
       this.vertexCount = 0;
       this.prevVertexCount = 0;
+      this.edgeCount = 0;
       this.isSticky = false;
 
       this.partition = null;
@@ -1143,6 +1166,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
       this.state = State.ONDISK;
       this.partition = null;
       this.vertexCount = partition.getVertexCount();
+      this.edgeCount = partition.getEdgeCount();
     }
 
     /**
@@ -1213,6 +1237,13 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
     }
 
     /**
+     * @return the edgeCount
+     */
+    public long getEdgeCount() {
+      return edgeCount;
+    }
+
+    /**
      * @param inc amount to add to the vertex count
      */
     public void addToVertexCount(long inc) {
@@ -1259,6 +1290,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
       sb.append("Number of References: " + references + "; ");
       sb.append("Number of Vertices: " + vertexCount + "; ");
       sb.append("Previous number of Vertices: " + prevVertexCount + "; ");
+      sb.append("Number of edges: " + edgeCount + "; ");
       sb.append("Is Sticky: " + isSticky + "; ");
       sb.append("Partition: " + partition + "; }");
 
index fdc20a5..bbcdcba 100644 (file)
@@ -96,6 +96,20 @@ public abstract class PartitionStore<I extends WritableComparable,
   public abstract int getNumPartitions();
 
   /**
+   * Return the number of vertices in a partition.
+   * @param partitionId Partition id
+   * @return The number of vertices in the specified partition
+   */
+  public abstract long getPartitionVertexCount(int partitionId);
+
+  /**
+   * Return the number of edges in a partition.
+   * @param partitionId Partition id
+   * @return The number of edges in the specified partition
+   */
+  public abstract long getPartitionEdgeCount(int partitionId);
+
+  /**
    * Whether the partition store is empty.
    *
    * @return True iff there are no partitions in the store
index 79c18c3..8ed6081 100644 (file)
@@ -110,5 +110,25 @@ public class SimplePartitionStore<I extends WritableComparable,
   }
 
   @Override
+  public long getPartitionVertexCount(int partitionId) {
+    Partition partition = partitions.get(partitionId);
+    if (partition == null) {
+      return 0;
+    } else {
+      return partition.getVertexCount();
+    }
+  }
+
+  @Override
+  public long getPartitionEdgeCount(int partitionId) {
+    Partition partition = partitions.get(partitionId);
+    if (partition == null) {
+      return 0;
+    } else {
+      return partition.getEdgeCount();
+    }
+  }
+
+  @Override
   public void putPartition(Partition<I, V, E> partition) { }
 }
index 6b74478..ed9a492 100644 (file)
@@ -704,17 +704,15 @@ else[HADOOP_NON_SECURE]*/
     // if necessary
     List<PartitionStats> partitionStatsList =
         new ArrayList<PartitionStats>();
-    for (Integer partitionId : getPartitionStore().getPartitionIds()) {
-      Partition<I, V, E> partition =
-          getPartitionStore().getOrCreatePartition(partitionId);
+    PartitionStore<I, V, E> partitionStore = getPartitionStore();
+    for (Integer partitionId : partitionStore.getPartitionIds()) {
       PartitionStats partitionStats =
-          new PartitionStats(partition.getId(),
-              partition.getVertexCount(),
+          new PartitionStats(partitionId,
+              partitionStore.getPartitionVertexCount(partitionId),
               0,
-              partition.getEdgeCount(),
+              partitionStore.getPartitionEdgeCount(partitionId),
               0, 0);
       partitionStatsList.add(partitionStats);
-      getPartitionStore().putPartition(partition);
     }
     workerGraphPartitioner.finalizePartitionStats(
         partitionStatsList, getPartitionStore());
@@ -1121,10 +1119,7 @@ else[HADOOP_NON_SECURE]*/
     long verticesToStore = 0;
     PartitionStore<I, V, E> partitionStore = getPartitionStore();
     for (int partitionId : partitionStore.getPartitionIds()) {
-      Partition<I, V, E> partition =
-        partitionStore.getOrCreatePartition(partitionId);
-      verticesToStore += partition.getVertexCount();
-      partitionStore.putPartition(partition);
+      verticesToStore += partitionStore.getPartitionVertexCount(partitionId);
     }
     WorkerProgress.get().startStoring(
         verticesToStore, getPartitionStore().getNumPartitions());
index 7605fb5..88e66a6 100644 (file)
@@ -288,10 +288,8 @@ public class TestPartitionStores {
     int totalEdges = 0;
     Partition<IntWritable, IntWritable, NullWritable> partition;
     for (int i = 0; i < NUM_OF_PARTITIONS; ++i) {
-      partition = store.getOrCreatePartition(i);
-      totalVertexes += partition.getVertexCount();
-      totalEdges += partition.getEdgeCount();
-      store.putPartition(partition);
+      totalVertexes += store.getPartitionVertexCount(i);
+      totalEdges += store.getPartitionEdgeCount(i);
     }
     assert vertexCounter.get() == NUM_OF_THREADS * NUM_OF_VERTEXES_PER_THREAD;
     assert totalVertexes == NUM_OF_THREADS * NUM_OF_VERTEXES_PER_THREAD;
@@ -412,16 +410,10 @@ public class TestPartitionStores {
     assertTrue(partitionStore.hasPartition(2));
     assertFalse(partitionStore.hasPartition(3));
     assertTrue(partitionStore.hasPartition(4));
-    partition = partitionStore.getOrCreatePartition(1);
-    assertEquals(3, partition.getVertexCount());
-    partitionStore.putPartition(partition);
-    partition = partitionStore.getOrCreatePartition(2);
-    assertEquals(2, partition.getVertexCount());
-    partitionStore.putPartition(partition);
-    partition = partitionStore.getOrCreatePartition(4);
-    assertEquals(1, partition.getVertexCount());
-    assertEquals(2, partition.getEdgeCount());
-    partitionStore.putPartition(partition);
+    assertEquals(3, partitionStore.getPartitionVertexCount(1));
+    assertEquals(2, partitionStore.getPartitionVertexCount(2));
+    assertEquals(1, partitionStore.getPartitionVertexCount(4));
+    assertEquals(2, partitionStore.getPartitionEdgeCount(4));
     partitionStore.deletePartition(2);
     assertEquals(2, partitionStore.getNumPartitions());
   }