Simple reformatting
authorJacques Nadeau <jacques@apache.org>
Fri, 8 Nov 2013 17:20:28 +0000 (09:20 -0800)
committerJacques Nadeau <jacques@apache.org>
Fri, 8 Nov 2013 17:20:28 +0000 (09:20 -0800)
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java

index e857c25..a24ec70 100644 (file)
@@ -29,13 +29,13 @@ import com.google.common.base.Preconditions;
 import java.util.List;
 
 public class TraceBatchCreator implements BatchCreator<Trace> {
-    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceBatchCreator.class);
-
-    @Override
-    public RecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children) throws ExecutionSetupException {
-        //Preconditions.checkArgument(children.size() == 1);
-        return new TraceRecordBatch(config, children.iterator().next(), context);
-    }
-
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceBatchCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    // Preconditions.checkArgument(children.size() == 1);
+    return new TraceRecordBatch(config, children.iterator().next(), context);
+  }
 
 }
index b73ddc1..1b990c9 100644 (file)
@@ -65,139 +65,125 @@ import org.apache.hadoop.fs.Path;
  * same set of value vectors (and selection vectors) to its parent record
  * batch
  */
-public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
-{
-    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceRecordBatch.class);
-
-    private SelectionVector2 sv = null;
-
-    /* Tag associated with each trace operator */
-    final String traceTag;
-
-    /* Location where the log should be dumped */
-    private final String logLocation;
-
-    /* File descriptors needed to be able to dump to log file */
-    private OutputStream fos;
-
-    public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context)
-    {
-        super(pop, context, incoming);
-        this.traceTag = pop.traceTag;
-        logLocation = context.getConfig().getString(ExecConstants.TRACE_DUMP_DIRECTORY);
-
-        String fileName = getFileName();
-
-        /* Create the log file we will dump to and initialize the file descriptors */
-        try
-        {
-          Configuration conf = new Configuration();
-          conf.set("fs.name.default", ExecConstants.TRACE_DUMP_FILESYSTEM);
-          FileSystem fs = FileSystem.get(conf);
-
-            /* create the file */
-          fos = fs.create(new Path(fileName));
-        } catch (IOException e)
-        {
-            logger.error("Unable to create file: " + fileName);
-        }
-    }
+public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceRecordBatch.class);
+
+  private SelectionVector2 sv = null;
+
+  /* Tag associated with each trace operator */
+  final String traceTag;
+
+  /* Location where the log should be dumped */
+  private final String logLocation;
+
+  /* File descriptors needed to be able to dump to log file */
+  private OutputStream fos;
+
+  public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context) {
+    super(pop, context, incoming);
+    this.traceTag = pop.traceTag;
+    logLocation = context.getConfig().getString(ExecConstants.TRACE_DUMP_DIRECTORY);
+
+    String fileName = getFileName();
+
+    /* Create the log file we will dump to and initialize the file descriptors */
+    try {
+      Configuration conf = new Configuration();
+      conf.set("fs.name.default", ExecConstants.TRACE_DUMP_FILESYSTEM);
+      FileSystem fs = FileSystem.get(conf);
 
-    @Override
-    public int getRecordCount()
-    {
-        if (sv == null)
-            return incoming.getRecordCount();
-        else
-           return sv.getCount();
+      /* create the file */
+      fos = fs.create(new Path(fileName));
+    } catch (IOException e) {
+      logger.error("Unable to create file: " + fileName);
     }
+  }
+
+  @Override
+  public int getRecordCount() {
+    if (sv == null)
+      return incoming.getRecordCount();
+    else
+      return sv.getCount();
+  }
+
+  /**
+   * Function is invoked for every record batch and it simply dumps the buffers associated with all the value vectors in
+   * this record batch to a log file.
+   */
+  @Override
+  protected void doWork() {
+
+    boolean incomingHasSv2 = incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE;
+    if (incomingHasSv2) {
+      sv = incoming.getSelectionVector2();
+    } else {
+      sv = null;
+    }
+    WritableBatch batch = WritableBatch.getBatchNoHVWrap(incoming.getRecordCount(), incoming, incomingHasSv2 ? true
+        : false);
+    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, sv, context.getAllocator());
+
+    try {
+      wrap.writeToStreamAndRetain(fos);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    batch.reconstructContainer(container);
+  }
 
-    /**
-     * Function is invoked for every record batch and it simply
-     * dumps the buffers associated with all the value vectors in
-     * this record batch to a log file.
+  @Override
+  protected void setupNewSchema() throws SchemaChangeException {
+    /* Trace operator does not deal with hyper vectors yet */
+    if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
+      throw new SchemaChangeException("Trace operator does not work with hyper vectors");
+
+    /*
+     * we have a new schema, clear our existing container to load the new value vectors
      */
-    @Override
-    protected void doWork()
-    {
-
-      boolean incomingHasSv2 = incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE;
-      if (incomingHasSv2) {
-        sv = incoming.getSelectionVector2();
-      } else {
-        sv = null;
-      }
-      WritableBatch batch = WritableBatch.getBatchNoHVWrap(incoming.getRecordCount(),
-              incoming, incomingHasSv2 ? true : false);
-      VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, sv, context.getAllocator());
-
-      try {
-        wrap.writeToStreamAndRetain(fos);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      batch.reconstructContainer(container);
-    }
+    container.clear();
 
-    @Override
-    protected void setupNewSchema() throws SchemaChangeException
-    {
-        /* Trace operator does not deal with hyper vectors yet */
-        if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
-            throw new SchemaChangeException("Trace operator does not work with hyper vectors");
-
-        /* we have a new schema, clear our existing container to
-         * load the new value vectors
-         */
-        container.clear();
-
-        /* Add all the value vectors in the container */
-        for(VectorWrapper<?> vv : incoming)
-        {
-            TransferPair tp = vv.getValueVector().getTransferPair();
-            container.add(tp.getTo());
-        }
+    /* Add all the value vectors in the container */
+    for (VectorWrapper<?> vv : incoming) {
+      TransferPair tp = vv.getValueVector().getTransferPair();
+      container.add(tp.getTo());
     }
+  }
 
-    @Override
-    public SelectionVector2 getSelectionVector2() {
-        return sv;
-    }
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    return sv;
+  }
 
-    private String getFileName()
-    {
-        /* From the context, get the query id, major fragment id,
-         * minor fragment id. This will be used as the file name
-         * to which we will dump the incoming buffer data
-         */
-        FragmentHandle handle = incoming.getContext().getHandle();
+  private String getFileName() {
+    /*
+     * From the context, get the query id, major fragment id, minor fragment id. This will be used as the file name to
+     * which we will dump the incoming buffer data
+     */
+    FragmentHandle handle = incoming.getContext().getHandle();
 
-        String qid = QueryIdHelper.getQueryId(handle.getQueryId());
+    String qid = QueryIdHelper.getQueryId(handle.getQueryId());
 
-        int majorFragmentId = handle.getMajorFragmentId();
-        int minorFragmentId = handle.getMinorFragmentId();
+    int majorFragmentId = handle.getMajorFragmentId();
+    int minorFragmentId = handle.getMinorFragmentId();
 
-        String fileName = String.format("%s//%s_%s_%s_%s", logLocation, qid, majorFragmentId, minorFragmentId, traceTag);
+    String fileName = String.format("%s//%s_%s_%s_%s", logLocation, qid, majorFragmentId, minorFragmentId, traceTag);
 
-        return fileName;
-    }
+    return fileName;
+  }
 
+  @Override
+  protected void cleanup() {
+    /* Release the selection vector */
+    if (sv != null)
+      sv.clear();
 
-    @Override
-    protected void cleanup()
-    {
-        /* Release the selection vector */
-        if (sv != null)
-          sv.clear();
-
-        /* Close the file descriptors */
-        try
-        {
-            fos.close();
-        } catch (IOException e)
-        {
-            logger.error("Unable to close file descriptors for file: " + getFileName());
-        }
+    /* Close the file descriptors */
+    try {
+      fos.close();
+    } catch (IOException e) {
+      logger.error("Unable to close file descriptors for file: " + getFileName());
     }
+  }
 
 }
index a33ca37..e9b56db 100644 (file)
@@ -60,17 +60,15 @@ public class WritableBatch {
     return buffers;
   }
 
-  public void reconstructContainer(VectorContainer container)
-  {
-    Preconditions.checkState(!cleared, "Attempted to reconstruct a container from a WritableBatch after it had been cleared");
-    if (buffers.length > 0)    /* If we have ByteBuf's associated with value vectors */
-    {
-
+  public void reconstructContainer(VectorContainer container) {
+    Preconditions.checkState(!cleared,
+        "Attempted to reconstruct a container from a WritableBatch after it had been cleared");
+    if (buffers.length > 0) { /* If we have ByteBuf's associated with value vectors */
+      
       CompositeByteBuf cbb = new CompositeByteBuf(buffers[0].alloc(), true, buffers.length);
 
-            /* Copy data from each buffer into the compound buffer */
-      for (ByteBuf buf : buffers)
-      {
+      /* Copy data from each buffer into the compound buffer */
+      for (ByteBuf buf : buffers) {
         cbb.addComponent(buf);
       }
 
@@ -78,13 +76,12 @@ public class WritableBatch {
 
       int bufferOffset = 0;
 
-            /* For each value vector slice up the appropriate size from
-             * the compound buffer and load it into the value vector
-             */
+      /*
+       * For each value vector slice up the appropriate size from the compound buffer and load it into the value vector
+       */
       int vectorIndex = 0;
 
-      for(VectorWrapper<?> vv : container)
-      {
+      for (VectorWrapper<?> vv : container) {
         FieldMetadata fmd = fields.get(vectorIndex);
         ValueVector v = vv.getValueVector();
         v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
@@ -101,9 +98,8 @@ public class WritableBatch {
     }
     container.buildSchema(svMode);
 
-        /* Set the record count in the value vector */
-    for(VectorWrapper<?> v : container)
-    {
+    /* Set the record count in the value vector */
+    for (VectorWrapper<?> v : container) {
       ValueVector.Mutator m = v.getValueVector().getMutator();
       m.setValueCount(def.getRecordCount());
     }
@@ -118,23 +114,24 @@ public class WritableBatch {
 
   public static WritableBatch getBatchNoHVWrap(int recordCount, Iterable<VectorWrapper<?>> vws, boolean isSV2) {
     List<ValueVector> vectors = Lists.newArrayList();
-    for(VectorWrapper<?> vw : vws){
+    for (VectorWrapper<?> vw : vws) {
       Preconditions.checkArgument(!vw.isHyper());
       vectors.add(vw.getValueVector());
     }
     return getBatchNoHV(recordCount, vectors, isSV2);
   }
-  
+
   public static WritableBatch getBatchNoHV(int recordCount, Iterable<ValueVector> vectors, boolean isSV2) {
     List<ByteBuf> buffers = Lists.newArrayList();
     List<FieldMetadata> metadata = Lists.newArrayList();
 
     for (ValueVector vv : vectors) {
       metadata.add(vv.getMetadata());
-      
-      // don't try to get the buffers if we don't have any records.  It is possible the buffers are dead buffers.
-      if(recordCount == 0) continue;
-      
+
+      // don't try to get the buffers if we don't have any records. It is possible the buffers are dead buffers.
+      if (recordCount == 0)
+        continue;
+
       for (ByteBuf b : vv.getBuffers()) {
         buffers.add(b);
       }
@@ -142,14 +139,15 @@ public class WritableBatch {
       vv.clear();
     }
 
-    RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount).setIsSelectionVector2(isSV2).build();
+    RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount)
+        .setIsSelectionVector2(isSV2).build();
     WritableBatch b = new WritableBatch(batchDef, buffers);
     return b;
   }
-  
+
   public static WritableBatch get(RecordBatch batch) {
-    if(batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
-        throw new UnsupportedOperationException("Only batches without hyper selections vectors are writable.");
+    if (batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
+      throw new UnsupportedOperationException("Only batches without hyper selections vectors are writable.");
 
     boolean sv2 = (batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
     return getBatchNoHVWrap(batch.getRecordCount(), batch, sv2);