DRILL-271 Address code review comments. VectorAccessibleSerializable now take Writabl...
authorSteven Phillips <sphillips@maprtech.com>
Fri, 1 Nov 2013 00:29:26 +0000 (17:29 -0700)
committerSteven Phillips <sphillips@maprtech.com>
Fri, 1 Nov 2013 00:34:56 +0000 (17:34 -0700)
exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java

index 62f8097..e5bb94b 100644 (file)
@@ -22,75 +22,87 @@ import com.google.common.collect.Lists;
 import com.yammer.metrics.MetricRegistry;
 import com.yammer.metrics.Timer;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.CompositeByteBuf;
 import org.apache.drill.common.util.DataInputInputStream;
 import org.apache.drill.common.util.DataOutputOutputStream;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.metrics.DrillMetrics;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.*;
 import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 
 import java.io.*;
 import java.util.List;
 
+/**
+ * A wrapper around a VectorAccessible. Will serialize a VectorAccessible and write to an OutputStream, or can read
+ * from an InputStream and construct a new VectorContainer.
+ */
 public class VectorAccessibleSerializable implements DrillSerializable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorAccessibleSerializable.class);
   static final MetricRegistry metrics = DrillMetrics.getInstance();
   static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
 
   private VectorAccessible va;
+  private WritableBatch batch;
   private BufferAllocator allocator;
   private int recordCount = -1;
   private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
   private SelectionVector2 sv2;
-  private int incomingRecordCount;
-  private VectorContainer retainedVectorContainer;
-  private SelectionVector2 retainedSelectionVector;
 
   private boolean retain = false;
 
-  /**
-   *
-   * @param va
-   */
-  public VectorAccessibleSerializable(VectorAccessible va, BufferAllocator allocator){
-    this.va = va;
+  public VectorAccessibleSerializable(BufferAllocator allocator) {
     this.allocator = allocator;
-    incomingRecordCount = va.getRecordCount();
+    this.va = new VectorContainer();
   }
 
-  public VectorAccessibleSerializable(VectorAccessible va, SelectionVector2 sv2, BufferAllocator allocator) {
-    this.va = va;
-    this.allocator = allocator;
-    this.sv2 = sv2;
-    if (sv2 != null) this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
-    incomingRecordCount = va.getRecordCount();
+  public VectorAccessibleSerializable(WritableBatch batch, BufferAllocator allocator){
+    this(batch, null, allocator);
   }
 
-  public VectorAccessibleSerializable(BufferAllocator allocator) {
-    this.va = new VectorContainer();
+  /**
+   * Creates a wrapper around batch and sv2 for writing to a stream. sv2 will never be released by this class, and ownership
+   * is maintained by caller.
+   * @param batch
+   * @param sv2
+   * @param allocator
+   */
+  public VectorAccessibleSerializable(WritableBatch batch, SelectionVector2 sv2, BufferAllocator allocator) {
     this.allocator = allocator;
+    if (batch != null) {
+      this.batch = batch;
+    }
+    if (sv2 != null) {
+      this.sv2 = sv2;
+      this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+    }
   }
 
   @Override
   public void read(DataInput input) throws IOException {
     readFromStream(DataInputInputStream.constructInputStream(input));
   }
-  
+
+  /**
+   * Reads from an InputStream and parses a RecordBatchDef. From this, we construct a SelectionVector2 if it exits
+   * and construct the vectors and add them to a vector container
+   * @param input the InputStream to read from
+   * @throws IOException
+   */
   @Override
   public void readFromStream(InputStream input) throws IOException {
     VectorContainer container = new VectorContainer();
     UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
     recordCount = batchDef.getRecordCount();
     if (batchDef.hasIsSelectionVector2() && batchDef.getIsSelectionVector2()) {
-      sv2.allocateNew(recordCount * 2);
-      sv2.getBuffer().setBytes(0, input, recordCount * 2);
+      if (sv2 == null) {
+        sv2 = new SelectionVector2(allocator);
+      }
+      sv2.allocateNew(recordCount * SelectionVector2.RECORD_SIZE);
+      sv2.getBuffer().setBytes(0, input, recordCount * SelectionVector2.RECORD_SIZE);
       svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
     }
     List<ValueVector> vectorList = Lists.newArrayList();
@@ -116,20 +128,27 @@ public class VectorAccessibleSerializable implements DrillSerializable {
     writeToStream(DataOutputOutputStream.constructOutputStream(output));
   }
 
+  public void writeToStreamAndRetain(OutputStream output) throws IOException {
+    retain = true;
+    writeToStream(output);
+  }
+
+  /**
+   * Serializes the VectorAccessible va and writes it to an output stream
+   * @param output the OutputStream to write to
+   * @throws IOException
+   */
   @Override
   public void writeToStream(OutputStream output) throws IOException {
     Preconditions.checkNotNull(output);
     final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time();
-    WritableBatch batch = WritableBatch.getBatchNoHVWrap(va.getRecordCount(),va,false);
 
     ByteBuf[] incomingBuffers = batch.getBuffers();
     UserBitShared.RecordBatchDef batchDef = batch.getDef();
 
         /* ByteBuf associated with the selection vector */
     ByteBuf svBuf = null;
-
-        /* Size of the selection vector */
-    int svCount = 0;
+    Integer svCount =  null;
 
     if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
     {
@@ -137,8 +156,6 @@ public class VectorAccessibleSerializable implements DrillSerializable {
       svBuf = sv2.getBuffer();
     }
 
-    int totalBufferLength = 0;
-
     try
     {
             /* Write the metadata to the file */
@@ -147,42 +164,20 @@ public class VectorAccessibleSerializable implements DrillSerializable {
             /* If we have a selection vector, dump it to file first */
       if (svBuf != null)
       {
-
-                /* For writing to the selection vectors we use
-                 * setChar() method which does not modify the
-                 * reader and writer index. To copy the entire buffer
-                 * without having to get each byte individually we need
-                 * to set the writer index
-                 */
-        svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);
-
-//        fc.write(svBuf.nioBuffers());
         svBuf.getBytes(0, output, svBuf.readableBytes());
-        if (!retain) {
-          svBuf.release();
-        }
+        sv2.setBuffer(svBuf);
+        sv2.setRecordCount(svCount);
       }
 
             /* Dump the array of ByteBuf's associated with the value vectors */
       for (ByteBuf buf : incomingBuffers)
       {
-                /* dump the buffer into the file channel */
+                /* dump the buffer into the OutputStream */
         int bufLength = buf.readableBytes();
         buf.getBytes(0, output, bufLength);
-
-                /* compute total length of buffer, will be used when
-                 * we create a compound buffer
-                 */
-        totalBufferLength += buf.readableBytes();
-        if (!retain) {
-          buf.release();
-        }
       }
 
       output.flush();
-      if (retain) {
-        reconstructRecordBatch(batchDef, incomingBuffers, totalBufferLength, svBuf, svCount, svMode);
-      }
 
       timerContext.stop();
     } catch (IOException e)
@@ -193,86 +188,16 @@ public class VectorAccessibleSerializable implements DrillSerializable {
     }
   }
 
-  private void reconstructRecordBatch(UserBitShared.RecordBatchDef batchDef,
-                                      ByteBuf[] vvBufs, int totalBufferLength,
-                                      ByteBuf svBuf, int svCount, BatchSchema.SelectionVectorMode svMode)
-  {
-    VectorContainer container = retainedVectorContainer;
-    if (vvBufs.length > 0)    /* If we have ByteBuf's associated with value vectors */
-    {
-
-      CompositeByteBuf cbb = new CompositeByteBuf(vvBufs[0].alloc(), true, vvBufs.length);
-
-            /* Copy data from each buffer into the compound buffer */
-      for (int i = 0; i < vvBufs.length; i++)
-      {
-        cbb.addComponent(vvBufs[i]);
-      }
-
-      List<FieldMetadata> fields = batchDef.getFieldList();
-
-      int bufferOffset = 0;
-
-            /* For each value vector slice up the appropriate size from
-             * the compound buffer and load it into the value vector
-             */
-      int vectorIndex = 0;
-
-      for(VectorWrapper<?> vv : container)
-      {
-        FieldMetadata fmd = fields.get(vectorIndex);
-        ValueVector v = vv.getValueVector();
-        v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
-        vectorIndex++;
-        bufferOffset += fmd.getBufferLength();
-      }
-    }
-
-        /* Set the selection vector for the record batch if the
-         * incoming batch had a selection vector
-         */
-    if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
-    {
-      if (sv2 == null)
-        sv2 = new SelectionVector2(allocator);
-
-      sv2.setRecordCount(svCount);
-
-            /* create our selection vector from the
-             * incoming selection vector's buffer
-             */
-      sv2.setBuffer(svBuf);
-
-      svBuf.release();
-    }
-
-    container.buildSchema(svMode);
-
-        /* Set the record count in the value vector */
-    for(VectorWrapper<?> v : container)
-    {
-      ValueVector.Mutator m = v.getValueVector().getMutator();
-      m.setValueCount(incomingRecordCount);
-    }
-    retainedVectorContainer = container;
-  }
-
   private void clear() {
+    if (!retain) {
+      batch.clear();
+    }
   }
 
   public VectorAccessible get() {
     return va;
   }
 
-  public void retain(VectorContainer container) {
-    this.retain = true;
-    this.retainedVectorContainer = container;
-  }
-
-  public VectorContainer getRetainedVectorContainer() {
-    return retainedVectorContainer;
-  }
-
   public SelectionVector2 getSv2() {
     return sv2;
   }
index e39b82e..317e705 100644 (file)
@@ -160,7 +160,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
         vectorList.add(vw.getValueVector());
       }
 
-      VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(containerToCache, context.getDrillbitContext().getAllocator());
+      WritableBatch batch = WritableBatch.getBatchNoHVWrap(containerToCache.getRecordCount(), containerToCache, false);
+      VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, context.getDrillbitContext().getAllocator());
 
       mmap.put(mapKey, wrap);
       wrap = null;
@@ -238,8 +239,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
       vw.getValueVector().getMutator().setValueCount(copier2.getOutputRecords());
     }
     candidatePartitionTable.setRecordCount(copier2.getOutputRecords());
+    WritableBatch batch = WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), candidatePartitionTable, false);
 
-    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(candidatePartitionTable, context.getDrillbitContext().getAllocator());
+    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, context.getDrillbitContext().getAllocator());
 
     tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
   }
index 36c390c..b73ddc1 100644 (file)
@@ -116,28 +116,27 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
      * Function is invoked for every record batch and it simply
      * dumps the buffers associated with all the value vectors in
      * this record batch to a log file.
-     *
-     * Function is divided into three main parts
-     *   1. Get all the buffers(ByteBuf's) associated with incoming
-     *      record batch's value vectors and selection vector
-     *   2. Dump these buffers to the log file (performed by writeToFile())
-     *   3. Construct the record batch with these buffers to look exactly like
-     *      the incoming record batch (performed by reconstructRecordBatch())
      */
     @Override
     protected void doWork()
     {
-      VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(incoming,
-            incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE ? incoming.getSelectionVector2() : null,
-            context.getAllocator());
-      wrap.retain(container);
+
+      boolean incomingHasSv2 = incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE;
+      if (incomingHasSv2) {
+        sv = incoming.getSelectionVector2();
+      } else {
+        sv = null;
+      }
+      WritableBatch batch = WritableBatch.getBatchNoHVWrap(incoming.getRecordCount(),
+              incoming, incomingHasSv2 ? true : false);
+      VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, sv, context.getAllocator());
 
       try {
-        wrap.writeToStream(fos);
+        wrap.writeToStreamAndRetain(fos);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
-      sv = wrap.getSv2();
+      batch.reconstructContainer(container);
     }
 
     @Override
index 76b79db..a33ca37 100644 (file)
@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
 
 import java.util.List;
 
+import io.netty.buffer.CompositeByteBuf;
 import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -37,6 +38,7 @@ public class WritableBatch {
 
   private final RecordBatchDef def;
   private final ByteBuf[] buffers;
+  private boolean cleared = false;
 
   private WritableBatch(RecordBatchDef def, List<ByteBuf> buffers) {
     logger.debug("Created new writable batch with def {} and buffers {}", def, buffers);
@@ -58,6 +60,62 @@ public class WritableBatch {
     return buffers;
   }
 
+  public void reconstructContainer(VectorContainer container)
+  {
+    Preconditions.checkState(!cleared, "Attempted to reconstruct a container from a WritableBatch after it had been cleared");
+    if (buffers.length > 0)    /* If we have ByteBuf's associated with value vectors */
+    {
+
+      CompositeByteBuf cbb = new CompositeByteBuf(buffers[0].alloc(), true, buffers.length);
+
+            /* Copy data from each buffer into the compound buffer */
+      for (ByteBuf buf : buffers)
+      {
+        cbb.addComponent(buf);
+      }
+
+      List<FieldMetadata> fields = def.getFieldList();
+
+      int bufferOffset = 0;
+
+            /* For each value vector slice up the appropriate size from
+             * the compound buffer and load it into the value vector
+             */
+      int vectorIndex = 0;
+
+      for(VectorWrapper<?> vv : container)
+      {
+        FieldMetadata fmd = fields.get(vectorIndex);
+        ValueVector v = vv.getValueVector();
+        v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
+        vectorIndex++;
+        bufferOffset += fmd.getBufferLength();
+      }
+    }
+
+    SelectionVectorMode svMode;
+    if (def.hasIsSelectionVector2() && def.getIsSelectionVector2()) {
+      svMode = SelectionVectorMode.TWO_BYTE;
+    } else {
+      svMode = SelectionVectorMode.NONE;
+    }
+    container.buildSchema(svMode);
+
+        /* Set the record count in the value vector */
+    for(VectorWrapper<?> v : container)
+    {
+      ValueVector.Mutator m = v.getValueVector().getMutator();
+      m.setValueCount(def.getRecordCount());
+    }
+  }
+
+  public void clear() {
+    for (ByteBuf buf : buffers) {
+      buf.release();
+    }
+    cleared = true;
+  }
+
   public static WritableBatch getBatchNoHVWrap(int recordCount, Iterable<VectorWrapper<?>> vws, boolean isSV2) {
     List<ValueVector> vectors = Lists.newArrayList();
     for(VectorWrapper<?> vw : vws){
index 94aa3dd..cf274c1 100644 (file)
@@ -24,10 +24,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.*;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.RemoteServiceSet;
@@ -68,7 +65,8 @@ public class TestVectorCache {
     VectorContainer container = new VectorContainer();
     container.addCollection(vectorList);
     container.setRecordCount(4);
-    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(container, context.getAllocator());
+    WritableBatch batch = WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false);
+    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, context.getAllocator());
 
     DistributedMultiMap<VectorAccessibleSerializable> mmap = cache.getMultiMap(VectorAccessibleSerializable.class);
     mmap.put("vectors", wrap);
index 11d15d8..fb3e821 100644 (file)
@@ -24,10 +24,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.*;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.RemoteServiceSet;
@@ -74,7 +71,8 @@ public class TestWriteToDisk {
     VectorContainer container = new VectorContainer();
     container.addCollection(vectorList);
     container.setRecordCount(4);
-    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(container, context.getAllocator());
+    WritableBatch batch = WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false);
+    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, context.getAllocator());
 
     Configuration conf = new Configuration();
     conf.set("fs.name.default", "file:///");