TEZ-3984: Shuffle: Out of Band DME event sending causes errors (Jaume Marhuenda,...
authorJaume Marhuenda <jmarhuenda@hortonworks.com>
Mon, 10 Sep 2018 19:50:00 +0000 (12:50 -0700)
committerGopal V <gopalv@apache.org>
Mon, 10 Sep 2018 19:50:00 +0000 (12:50 -0700)
Signed-off-by: Gopal V <gopalv@apache.org>
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java

index b6fe457..9e65862 100644 (file)
@@ -21,11 +21,14 @@ package org.apache.tez.runtime.library.common.sort.impl;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
+import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.OutputStatisticsReporter;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
@@ -68,11 +71,12 @@ public abstract class ExternalSorter {
 
   private static final Logger LOG = LoggerFactory.getLogger(ExternalSorter.class);
 
-  public void close() throws IOException {
+  public List<Event> close() throws IOException {
     spillFileIndexPaths.clear();
     spillFilePaths.clear();
     reportStatistics();
     outputContext.notifyProgress();
+    return Collections.emptyList();
   }
 
   public abstract void flush() throws IOException;
index 7915662..028dd2f 100644 (file)
@@ -123,6 +123,11 @@ public class PipelinedSorter extends ExternalSorter {
   private final Deflater deflater;
   private final String auxiliaryService;
 
+  /**
+   * Store the events to be send in close.
+   */
+  private final List<Event> finalEvents;
+
   // TODO Set additional countesr - total bytes written, spills etc.
 
   public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
@@ -236,6 +241,7 @@ public class PipelinedSorter extends ExternalSorter {
     keySerializer.open(span.out);
     minSpillsForCombine = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
     deflater = TezCommonUtils.newBestCompressionDeflater();
+    finalEvents = Lists.newLinkedList();
   }
 
   ByteBuffer allocateSpace() {
@@ -695,8 +701,6 @@ public class PipelinedSorter extends ExternalSorter {
       }
 
       if (!isFinalMergeEnabled()) {
-        //Generate events for all spills
-        List<Event> events = Lists.newLinkedList();
 
         //For pipelined shuffle, previous events are already sent. Just generate the last event alone
         int startIndex = (pipelinedShuffle) ? (numSpills - 1) : 0;
@@ -705,13 +709,12 @@ public class PipelinedSorter extends ExternalSorter {
         for (int i = startIndex; i < endIndex; i++) {
           boolean isLastEvent = (i == numSpills - 1);
           String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i);
-          ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent,
+          ShuffleUtils.generateEventOnSpill(finalEvents, isFinalMergeEnabled(), isLastEvent,
               outputContext, i, indexCacheList.get(i), partitions,
               sendEmptyPartitionDetails, pathComponent, partitionStats,
               reportDetailedPartitionStats(), auxiliaryService, deflater);
           LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
         }
-        outputContext.sendEvents(events);
         return;
       }
 
@@ -850,6 +853,16 @@ public class PipelinedSorter extends ExternalSorter {
     }
   }
 
+  /**
+   * Close and send events.
+   * @return events to be returned by the edge.
+   * @throws IOException parent can throw this.
+   */
+  public final List<Event> close() throws IOException {
+    super.close();
+    return finalEvents;
+  }
+
 
   private interface PartitionedRawKeyValueIterator extends TezRawKeyValueIterator {
     int getPartition();
index 557a538..9b5a43c 100644 (file)
@@ -753,10 +753,10 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
   }
 
   @Override
-  public void close() throws IOException {
-    super.close();
+  public List<Event> close() throws IOException {
     kvbuffer = null;
     kvmeta = null;
+    return super.close();
   }
 
   boolean isClosed() {
index 7d3e0b4..32a4f4d 100644 (file)
@@ -181,12 +181,12 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
 
   @Override
   public synchronized List<Event> close() throws IOException {
-    List<Event> returnEvents = null;
+    List<Event> returnEvents = Lists.newLinkedList();
     if (sorter != null) {
       sorter.flush();
-      sorter.close();
+      returnEvents.addAll(sorter.close());
       this.endTime = System.nanoTime();
-      returnEvents = generateEvents();
+      returnEvents.addAll(generateEvents());
       sorter = null;
     } else {
       LOG.warn(getContext().getDestinationVertexName() +
index 727f8ac..bd7f585 100644 (file)
@@ -59,12 +59,14 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doReturn;
@@ -402,12 +404,15 @@ public class TestPipelinedSorter {
         initialAvailableMem);
 
     //Write 100 keys each of size 10
-    writeData(sorter, 10000, 100);
+    writeData(sorter, 10000, 100, false);
+    sorter.flush();
+    List<Event> events = sorter.close();
 
     //final merge is disabled. Final output file would not be populated in this case.
     assertTrue(sorter.finalOutputFile == null);
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
-    verify(outputContext, times(1)).sendEvents(anyListOf(Event.class));
+    verify(outputContext, times(0)).sendEvents(any());
+    assertTrue(events.size() > 0);
   }
 
   @Test