TEZ-3824. MRCombiner creates new JobConf copy per spill (Jonathan Eagles via jlowe)
authorJason Lowe <jlowe@apache.org>
Mon, 14 May 2018 18:17:43 +0000 (13:17 -0500)
committerJason Lowe <jlowe@apache.org>
Mon, 14 May 2018 18:17:43 +0000 (13:17 -0500)
tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java

index 9514215..adfd24d 100644 (file)
@@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.combine;
 
 import java.io.IOException;
 
+import org.apache.hadoop.mapred.JobConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -78,7 +79,13 @@ public class MRCombiner implements Combiner {
   private final TaskAttemptID mrTaskAttemptID;
 
   public MRCombiner(TaskContext taskContext) throws IOException {
-    this.conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
+    final Configuration userConf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
+    useNewApi = ConfigUtils.useNewApi(userConf);
+    if (useNewApi) {
+      conf = new JobConf(userConf);
+    } else {
+      conf = userConf;
+    }
 
     assert(taskContext instanceof InputContext || taskContext instanceof OutputContext);
     if (taskContext instanceof OutputContext) {
@@ -93,8 +100,6 @@ public class MRCombiner implements Combiner {
       this.reporter = new MRTaskReporter((InputContext)taskContext);
     }
 
-    this.useNewApi = ConfigUtils.useNewApi(conf);
-    
     combineInputRecordsCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
     combineOutputRecordsCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);