DRILL-254: Add iterator validator and correct interface violations
authorJacques Nadeau <jacques@apache.org>
Fri, 25 Oct 2013 16:32:44 +0000 (09:32 -0700)
committerJacques Nadeau <jacques@apache.org>
Sun, 10 Nov 2013 04:19:13 +0000 (20:19 -0800)
12 files changed:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java [new file with mode: 0644]
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java [new file with mode: 0644]
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java [new file with mode: 0644]
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java [new file with mode: 0644]
exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java

index 9e2ef0a..d1188bf 100644 (file)
@@ -141,6 +141,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
   }
 
   @Override
+  public T visitIteratorValidator(IteratorValidator op, X value) throws E {
+    return visitOp(op, value);
+  }
+
+  @Override
   public T visitOp(PhysicalOperator op, X value) throws E{
     throw new UnsupportedOperationException(String.format(
         "The PhysicalVisitor of type %s does not currently support visiting the PhysicalOperator type %s.", this
index 2474c15..120306a 100644 (file)
@@ -55,4 +55,6 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
   public RETURN visitScreen(Screen op, EXTRA value) throws EXCEP;
   public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP;
   public RETURN visitUnionExchange(UnionExchange op, EXTRA value) throws EXCEP;
+  
+  public RETURN visitIteratorValidator(IteratorValidator op, EXTRA value) throws EXCEP;
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java
new file mode 100644 (file)
index 0000000..67bba96
--- /dev/null
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+
+public class IteratorValidator extends AbstractSingle{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidator.class);
+
+  public IteratorValidator(PhysicalOperator child) {
+    super(child);
+    
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return new OperatorCost(0f, 0f, 0f, 0f);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitIteratorValidator(this, value);
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new IteratorValidator(child);
+  }
+
+}
index 67e9452..366be22 100644 (file)
@@ -28,7 +28,6 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.physical.config.*;
 import org.apache.drill.exec.physical.impl.aggregate.AggBatchCreator;
-import org.apache.drill.exec.physical.config.Union;
 import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
 import org.apache.drill.exec.physical.impl.join.MergeJoinCreator;
 import org.apache.drill.exec.physical.impl.limit.LimitBatchCreator;
@@ -39,6 +38,8 @@ import org.apache.drill.exec.physical.impl.sort.SortBatchCreator;
 import org.apache.drill.exec.physical.impl.svremover.SVRemoverCreator;
 import org.apache.drill.exec.physical.impl.trace.TraceBatchCreator;
 import org.apache.drill.exec.physical.impl.union.UnionBatchCreator;
+import org.apache.drill.exec.physical.impl.validate.IteratorValidatorCreator;
+import org.apache.drill.exec.physical.impl.validate.IteratorValidatorInjector;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.json.JSONScanBatchCreator;
 import org.apache.drill.exec.store.json.JSONSubScan;
@@ -73,6 +74,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
   private SortBatchCreator sbc = new SortBatchCreator();
   private AggBatchCreator abc = new AggBatchCreator();
   private MergeJoinCreator mjc = new MergeJoinCreator();
+  private IteratorValidatorCreator ivc = new IteratorValidatorCreator();
   private RootExec root = null;
   private TraceBatchCreator tbc = new TraceBatchCreator();
 
@@ -190,12 +192,24 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
     return children;
   }
 
+  @Override
+  public RecordBatch visitIteratorValidator(IteratorValidator op, FragmentContext context) throws ExecutionSetupException {
+    return ivc.getBatch(context, op, getChildren(op, context));
+  }
+  
   public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
     ImplCreator i = new ImplCreator();
+    boolean isAssertEnabled = false;
+    assert isAssertEnabled = true;
+    if(isAssertEnabled){
+      root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root);
+    }
     root.accept(i, context);
     if (i.root == null)
       throw new ExecutionSetupException(
           "The provided fragment did not have a root node that correctly created a RootExec value.");
     return i.getRoot();
   }
+
+
 }
index 4d1e3ce..e1fb3ae 100644 (file)
@@ -91,23 +91,15 @@ public class ScreenCreator implements RootCreator<Screen>{
           return false;
       }
       case NONE: {
-        if(materializer == null){
-          // receive no results.
-          context.batchesCompleted.inc(1);
-          context.recordsCompleted.inc(incoming.getRecordCount());
-          QueryResult header = QueryResult.newBuilder() //
-              .setQueryId(context.getHandle().getQueryId()) //
-              .setRowCount(0) //
-              .setDef(RecordBatchDef.getDefaultInstance()) //
-              .setIsLastChunk(true) //
-              .build();
-          QueryWritableBatch batch = new QueryWritableBatch(header);
-          connection.sendResult(listener, batch);
-
-        }else{
-          QueryWritableBatch batch = materializer.convertNext(true);
-          connection.sendResult(listener, batch);
-        }
+        context.batchesCompleted.inc(1);
+        QueryResult header = QueryResult.newBuilder() //
+            .setQueryId(context.getHandle().getQueryId()) //
+            .setRowCount(0) //
+            .setDef(RecordBatchDef.getDefaultInstance()) //
+            .setIsLastChunk(true) //
+            .build();
+        QueryWritableBatch batch = new QueryWritableBatch(header);
+        connection.sendResult(listener, batch);
 
         return false;
       }
index ca9929f..1d1d420 100644 (file)
@@ -73,7 +73,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
       switch(out){
       case STOP:
       case NONE:
-        FragmentWritableBatch b2 = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
+        FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0);
         tunnel.sendRecordBatch(new RecordSendFailure(), context, b2);
         return false;
 
index b63a4d0..0b0214a 100644 (file)
@@ -97,8 +97,15 @@ public class WireRecordBatch implements RecordBatch{
   @Override
   public IterOutcome next() {
     RawFragmentBatch batch = fragProvider.getNext();
+    
+    // skip over empty batches. we do this since these are basically control messages.
+    while(batch != null && batch.getHeader().getDef().getRecordCount() == 0){
+      batch = fragProvider.getNext();
+    }
+    
     try{
       if (batch == null) return IterOutcome.NONE;
+      
 
       logger.debug("Next received batch {}", batch);
 
index 317e705..a3d1d09 100644 (file)
@@ -83,7 +83,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
   private boolean upstreamNone = false;
   private int recordCount;
   private DistributedMap<VectorAccessibleSerializable> tableMap;
-  private DistributedMultiMap mmap;
+  private DistributedMultiMap<?> mmap;
   private String mapKey;
 
   public OrderedPartitionRecordBatch(OrderedPartitionSender pop, RecordBatch incoming, FragmentContext context){
@@ -123,6 +123,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
           case STOP:
             upstreamNone = true;
             break outer;
+        default:
+          // fall through
         }
         builder.add(incoming);
         recordsSampled += incoming.getRecordCount();
@@ -144,7 +146,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
       SampleCopier copier = getCopier(sv4, sampleContainer, containerToCache, popConfig.getOrderings());
       copier.copyRecords(recordsSampled/(samplingFactor * partitions), 0, samplingFactor * partitions);
 
-      for (VectorWrapper vw : containerToCache) {
+      for (VectorWrapper<?> vw : containerToCache) {
         vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords());
       }
       containerToCache.setRecordCount(copier.getOutputRecords());
@@ -156,7 +158,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
       mapKey = String.format("%s_%d", context.getHandle().getQueryId(), context.getHandle().getMajorFragmentId());
       mmap = cache.getMultiMap(VectorAccessibleSerializable.class);
       List<ValueVector> vectorList = Lists.newArrayList();
-      for (VectorWrapper vw : containerToCache) {
+      for (VectorWrapper<?> vw : containerToCache) {
         vectorList.add(vw.getValueVector());
       }
 
@@ -193,7 +195,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
       Preconditions.checkState(wrap != null);
 
       // Extract vectors from the wrapper, and add to partition vectors. These vectors will be used for partitioning in the rest of this operator
-      for (VectorWrapper w : wrap.get()) {
+      for (VectorWrapper<?> w : wrap.get()) {
         partitionVectors.add(w.getValueVector());
       }
     } catch (ClassTransformationException | IOException | SchemaChangeException | InterruptedException ex) {
@@ -235,7 +237,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     int skipRecords = containerBuilder.getSv4().getTotalCount() / partitions;
     copier2.copyRecords(skipRecords, skipRecords, partitions - 1);
     assert copier2.getOutputRecords() == partitions - 1 : String.format("output records: %d partitions: %d", copier2.getOutputRecords(), partitions);
-    for (VectorWrapper vw : candidatePartitionTable) {
+    for (VectorWrapper<?> vw : candidatePartitionTable) {
       vw.getValueVector().getMutator().setValueCount(copier2.getOutputRecords());
     }
     candidatePartitionTable.setRecordCount(copier2.getOutputRecords());
@@ -324,7 +326,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
 
     // Reaching this point, either this is the first iteration, or there are no batches left on the queue and there are more incoming
     IterOutcome upstream = incoming.next();
-    recordCount = incoming.getRecordCount();
+    
 
     if(this.first && upstream == IterOutcome.OK) {
       throw new RuntimeException("Invalid state: First batch should have OK_NEW_SCHEMA");
@@ -347,6 +349,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
         return IterOutcome.STOP;
       }
       doWork(vc);
+      recordCount = vc.getRecordCount();
       return IterOutcome.OK_NEW_SCHEMA;
     }
 
@@ -361,6 +364,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
       case NOT_YET:
       case STOP:
         container.zeroVectors();
+        recordCount = 0;
         return upstream;
       case OK_NEW_SCHEMA:
         try{
@@ -374,6 +378,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
         // fall through.
       case OK:
         doWork(incoming);
+        recordCount = incoming.getRecordCount();
         return upstream; // change if upstream changed, otherwise normal.
       default:
         throw new UnsupportedOperationException();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
new file mode 100644 (file)
index 0000000..d6f08f1
--- /dev/null
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.validate;
+
+import java.util.Iterator;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public class IteratorValidatorBatchIterator implements RecordBatch{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorBatchIterator.class);
+
+  private IterOutcome state = IterOutcome.NOT_YET;
+  private final RecordBatch incoming;
+  
+  public IteratorValidatorBatchIterator(RecordBatch incoming){
+    this.incoming = incoming;
+  }
+  
+  private void validateReadState(){
+    switch(state){
+    case OK:
+    case OK_NEW_SCHEMA:
+      return;
+    default:
+      throw new IllegalStateException(String.format("You tried to do a batch data read operation when you were in a state of %s.  You can only do this type of operation when you are in a state of OK or OK_NEW_SCHEMA.", state.name()));
+    }
+  }
+  
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
+    validateReadState();
+    return incoming.iterator();
+  }
+
+  @Override
+  public FragmentContext getContext() {
+    return incoming.getContext();
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    validateReadState();
+    return incoming.getSchema();
+  }
+
+  @Override
+  public int getRecordCount() {
+    validateReadState();
+    return incoming.getRecordCount();
+  }
+
+  @Override
+  public void kill() {
+    incoming.kill();
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    validateReadState();
+    return incoming.getSelectionVector2();
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    validateReadState();
+    return incoming.getSelectionVector4();
+  }
+
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+    validateReadState();
+    return incoming.getValueVectorId(path);
+  }
+
+  @Override
+  public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
+    validateReadState();
+    return incoming.getValueAccessorById(fieldId, clazz);
+  }
+
+  @Override
+  public IterOutcome next() {
+    if(state == IterOutcome.NONE ) throw new IllegalStateException("The incoming iterator has previously moved to a state of NONE. You should not be attempting to call next() again.");
+    state = incoming.next();
+    return state;
+  }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+    validateReadState();
+    return incoming.getWritableBatch();
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
new file mode 100644 (file)
index 0000000..5d08afb
--- /dev/null
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.validate;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.IteratorValidator;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.base.Preconditions;
+
+public class IteratorValidatorCreator implements BatchCreator<IteratorValidator>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, IteratorValidator config, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    Preconditions.checkArgument(children.size() == 1);
+    return new IteratorValidatorBatchIterator(children.iterator().next());
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
new file mode 100644 (file)
index 0000000..aff71bf
--- /dev/null
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.validate;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.IteratorValidator;
+
+import com.google.common.collect.Lists;
+
+public class IteratorValidatorInjector extends
+    AbstractPhysicalVisitor<PhysicalOperator, FragmentContext, ExecutionSetupException> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorInjector.class);
+
+  public static FragmentRoot rewritePlanWithIteratorValidator(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
+    IteratorValidatorInjector inject = new IteratorValidatorInjector();
+    PhysicalOperator newOp = root.accept(inject, context);
+    
+    if( !(newOp instanceof FragmentRoot) ) throw new IllegalStateException("This shouldn't happen.");
+
+    return (FragmentRoot) newOp;
+    
+  }
+
+  /**
+   * Traverse the physical plan and inject the IteratorValidator operator after every operator.
+   * 
+   * @param op
+   *          Physical operator under which the IteratorValidator operator will be injected
+   * @param context
+   *          Fragment context
+   * @return same physical operator as passed in, but its child will be a IteratorValidator operator whose child will be the
+   *         original child of this operator
+   * @throws ExecutionSetupException
+   */
+  @Override
+  public PhysicalOperator visitOp(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException {
+
+    List<PhysicalOperator> newChildren = Lists.newArrayList();
+    PhysicalOperator newOp = op;
+
+    /* Get the list of child operators */
+    for (PhysicalOperator child : op) {
+      newChildren.add(new IteratorValidator(child.accept(this, context)));
+    }
+
+    /* Inject trace operator */
+    if (newChildren.size() > 0)
+      newOp = op.getNewWithChildren(newChildren);
+
+    return newOp;
+  }
+
+}
index a1f22c2..41ad8b7 100644 (file)
@@ -22,15 +22,22 @@ import io.netty.buffer.ByteBuf;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.FragmentRecordBatch;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 
 public class FragmentWritableBatch{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentWritableBatch.class);
   
+  private static RecordBatchDef EMPTY_DEF = RecordBatchDef.newBuilder().setRecordCount(0).build();
+  
   private final ByteBuf[] buffers;
   private final FragmentRecordBatch header;
-  
+
   public FragmentWritableBatch(boolean isLast, QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId, WritableBatch batch){
-    this.buffers = batch.getBuffers();
+    this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentId, batch.getDef(), batch.getBuffers());
+  }
+  
+  private FragmentWritableBatch(boolean isLast, QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId, RecordBatchDef def, ByteBuf... buffers){
+    this.buffers = buffers;
     FragmentHandle handle = FragmentHandle //
         .newBuilder() //
         .setMajorFragmentId(receiveMajorFragmentId) //
@@ -40,12 +47,17 @@ public class FragmentWritableBatch{
     this.header = FragmentRecordBatch //
         .newBuilder() //
         .setIsLastBatch(isLast) //
-        .setDef(batch.getDef()) //
+        .setDef(def) //
         .setHandle(handle) //
         .setSendingMajorFragmentId(sendMajorFragmentId) //
         .setSendingMinorFragmentId(sendMinorFragmentId) //
         .build();
   }
+  
+  
+  public static FragmentWritableBatch getEmptyLast(QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId){
+    return new FragmentWritableBatch(true, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentId, EMPTY_DEF);
+  }
 
   public ByteBuf[] getBuffers(){
     return buffers;
@@ -53,9 +65,11 @@ public class FragmentWritableBatch{
 
   public FragmentRecordBatch getHeader() {
     return header;
+    
   }
   
 
   
   
+  
 }