TEZ-3974: Correctness regression of TEZ-955 in TEZ-2937 (Jaume Marhuenda, reviewed... master
authorJaume Marhuenda <jmarhuenda@hortonworks.com>
Thu, 9 Aug 2018 21:39:56 +0000 (14:39 -0700)
committerGopal V <gopalv@apache.org>
Thu, 9 Aug 2018 21:39:56 +0000 (14:39 -0700)
tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java

index 5c2ab77..0ac916f 100644 (file)
@@ -380,30 +380,43 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
           "Can only run while in RUNNING state. Current: " + this.state);
       this.state.set(State.CLOSED);
 
+
+      List<List<Event>> allCloseInputEvents = Lists.newArrayList();
       // Close the Inputs.
       for (InputSpec inputSpec : inputSpecs) {
         String srcVertexName = inputSpec.getSourceVertexName();
         initializedInputs.remove(srcVertexName);
         List<Event> closeInputEvents = ((InputFrameworkInterface)inputsMap.get(srcVertexName)).close();
-        sendTaskGeneratedEvents(closeInputEvents,
-            EventProducerConsumerType.INPUT, taskSpec.getVertexName(),
-            srcVertexName, taskSpec.getTaskAttemptID());
+        allCloseInputEvents.add(closeInputEvents);
       }
 
+      List<List<Event>> allCloseOutputEvents = Lists.newArrayList();
       // Close the Outputs.
       for (OutputSpec outputSpec : outputSpecs) {
         String destVertexName = outputSpec.getDestinationVertexName();
         initializedOutputs.remove(destVertexName);
         List<Event> closeOutputEvents = ((LogicalOutputFrameworkInterface)outputsMap.get(destVertexName)).close();
-        sendTaskGeneratedEvents(closeOutputEvents,
-            EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
-            destVertexName, taskSpec.getTaskAttemptID());
+        allCloseOutputEvents.add(closeOutputEvents);
       }
 
       // Close the Processor.
       processorClosed = true;
       processor.close();
 
+      for (int i = 0; i < allCloseInputEvents.size(); i++) {
+        String srcVertexName = inputSpecs.get(i).getSourceVertexName();
+        sendTaskGeneratedEvents(allCloseInputEvents.get(i),
+            EventProducerConsumerType.INPUT, taskSpec.getVertexName(),
+            srcVertexName, taskSpec.getTaskAttemptID());
+      }
+
+      for (int i = 0; i < allCloseOutputEvents.size(); i++) {
+        String destVertexName = outputSpecs.get(i).getDestinationVertexName();
+        sendTaskGeneratedEvents(allCloseOutputEvents.get(i),
+            EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
+            destVertexName, taskSpec.getTaskAttemptID());
+      }
+
     } finally {
       setTaskDone();
       // Clear the interrupt status since the task execution is done.
index c1bb3a1..599f98f 100644 (file)
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -51,6 +52,7 @@ import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.api.Writer;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
@@ -62,6 +64,7 @@ import org.junit.Test;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
+import org.mockito.Mockito;
 
 public class TestLogicalIOProcessorRuntimeTask {
 
@@ -77,10 +80,14 @@ public class TestLogicalIOProcessorRuntimeTask {
         ScalingAllocator.class.getName());
 
     TezTaskAttemptID taId1 = createTaskAttemptID(vertexId, 1);
-    TaskSpec task1 = createTaskSpec(taId1, "dag1", "vertex1", 30);
+    TaskSpec task1 = createTaskSpec(taId1, "dag1",
+        "vertex1", 30, TestProcessor.class.getName(),
+        TestOutput.class.getName());
 
     TezTaskAttemptID taId2 = createTaskAttemptID(vertexId, 2);
-    TaskSpec task2 = createTaskSpec(taId2, "dag2", "vertex1", 10);
+    TaskSpec task2 = createTaskSpec(taId2, "dag2",
+        "vertex1", 10, TestProcessor.class.getName(),
+        TestOutput.class.getName());
 
     TezSharedExecutor sharedExecutor = new TezSharedExecutor(tezConf);
     LogicalIOProcessorRuntimeTask lio1 = new LogicalIOProcessorRuntimeTask(task1, 0, tezConf, null,
@@ -142,6 +149,50 @@ public class TestLogicalIOProcessorRuntimeTask {
 
   }
 
+  /**
+   * We should expect no events being sent to the AM if an
+   * exception happens in the close method of the processor
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testExceptionHappensInClose() throws Exception {
+    TezDAGID dagId = createTezDagId();
+    TezVertexID vertexId = createTezVertexId(dagId);
+    Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<>();
+    Multimap<String, String> startedInputsMap = HashMultimap.create();
+    TezUmbilical umbilical = mock(TezUmbilical.class);
+    TezConfiguration tezConf = new TezConfiguration();
+    tezConf.set(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS,
+        ScalingAllocator.class.getName());
+
+    TezTaskAttemptID taId1 = createTaskAttemptID(vertexId, 1);
+    TaskSpec task1 = createTaskSpec(taId1, "dag1", "vertex1", 30,
+        FaultyTestProcessor.class.getName(),
+        TestOutputWithEvents.class.getName());
+
+    TezSharedExecutor sharedExecutor = new TezSharedExecutor(tezConf);
+    LogicalIOProcessorRuntimeTask lio1 = new LogicalIOProcessorRuntimeTask(task1, 0, tezConf, null,
+        umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap, null,
+        "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true,
+        new DefaultHadoopShim(), sharedExecutor);
+
+    try {
+      lio1.initialize();
+      lio1.run();
+
+      try {
+        lio1.close();
+        fail("RuntimeException should have been thrown");
+      } catch (RuntimeException e) {
+        // No events should be sent thorught the umbilical protocol
+        Mockito.verify(umbilical, Mockito.never()).addEvents(Mockito.anyList());
+      }
+    } finally {
+      sharedExecutor.shutdownNow();
+      cleanupAndTest(lio1);
+    }
+  }
+
   private void cleanupAndTest(LogicalIOProcessorRuntimeTask lio) throws InterruptedException {
 
     ProcessorContext procContext = lio.getProcessorContext();
@@ -175,7 +226,7 @@ public class TestLogicalIOProcessorRuntimeTask {
       assertEquals(0, lio.outputSpecs.size());
       assertTrue(lio.groupInputSpecs == null || lio.groupInputSpecs.size() == 0);
     }
-    
+
     assertEquals(0, lio.inputsMap.size());
     assertEquals(0, lio.inputContextMap.size());
     assertEquals(0, lio.outputsMap.size());
@@ -190,11 +241,12 @@ public class TestLogicalIOProcessorRuntimeTask {
   }
 
   private TaskSpec createTaskSpec(TezTaskAttemptID taskAttemptID,
-      String dagName, String vertexName, int parallelism) {
-    ProcessorDescriptor processorDesc = createProcessorDescriptor();
+      String dagName, String vertexName, int parallelism,
+      String processorClassname, String outputClassName) {
+    ProcessorDescriptor processorDesc = createProcessorDescriptor(processorClassname);
     TaskSpec taskSpec = new TaskSpec(taskAttemptID,
         dagName, vertexName, parallelism, processorDesc,
-        createInputSpecList(), createOutputSpecList(), null, null);
+        createInputSpecList(), createOutputSpecList(outputClassName), null, null);
     return taskSpec;
   }
 
@@ -204,14 +256,14 @@ public class TestLogicalIOProcessorRuntimeTask {
     return Lists.newArrayList(inputSpec);
   }
 
-  private List<OutputSpec> createOutputSpecList() {
-    OutputDescriptor outputtDesc = OutputDescriptor.create(TestOutput.class.getName());
+  private List<OutputSpec> createOutputSpecList(String outputClassName) {
+    OutputDescriptor outputtDesc = OutputDescriptor.create(outputClassName);
     OutputSpec outputSpec = new OutputSpec("outedge", outputtDesc, 1);
     return Lists.newArrayList(outputSpec);
   }
 
-  private ProcessorDescriptor createProcessorDescriptor() {
-    ProcessorDescriptor desc = ProcessorDescriptor.create(TestProcessor.class.getName());
+  private ProcessorDescriptor createProcessorDescriptor(String className) {
+    ProcessorDescriptor desc = ProcessorDescriptor.create(className);
     return desc;
   }
 
@@ -248,15 +300,25 @@ public class TestLogicalIOProcessorRuntimeTask {
       getContext().notifyProgress();
     }
 
-       @Override
-       public void handleEvents(List<Event> processorEvents) {
-               
-       }
+    @Override
+    public void handleEvents(List<Event> processorEvents) {
+    }
 
-       @Override
-       public void close() throws Exception {
-               
-       }
+    @Override
+    public void close() throws Exception {
+    }
+
+  }
+
+  public static class FaultyTestProcessor extends TestProcessor {
+    public FaultyTestProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void close() throws Exception {
+      throw new RuntimeException();
+    }
 
   }
 
@@ -336,6 +398,22 @@ public class TestLogicalIOProcessorRuntimeTask {
     public List<Event> close() throws Exception {
       return null;
     }
+  }
+
+  public static class TestOutputWithEvents extends TestOutput {
+
+    public static volatile int startCount = 0;
+    public static volatile int vertexParallelism;
+
+    public TestOutputWithEvents(OutputContext outputContext, int numPhysicalOutputs) {
+      super(outputContext, numPhysicalOutputs);
+    }
 
+    @Override
+    public List<Event> close() throws Exception {
+      return Arrays.asList(
+          CompositeDataMovementEvent.create(0,
+              0, null));
+    }
   }
 }